You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/16 02:10:18 UTC

[incubator-seatunnel] branch dev updated: [Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45c542c95 [Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)
45c542c95 is described below

commit 45c542c95ebec1f28aa059dc1d1a9462caeacae4
Author: 巧克力黑 <ja...@aliyun.com>
AuthorDate: Tue Aug 16 10:10:12 2022 +0800

    [Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)
    
    * self achieved redis proxy
---
 docs/en/connector/sink/Redis.md                    |  6 +++
 docs/en/connector/source/Redis.md                  | 28 ++++++++------
 .../seatunnel/spark/redis/common/Constants.scala   |  2 +
 .../seatunnel/spark/redis/common/RedisUtil.scala   | 44 ++++++++++++++++++++++
 ...ts.scala => SelfAchievedRedisProxyConfig.scala} | 30 ++++-----------
 .../apache/seatunnel/spark/redis/sink/Redis.scala  | 19 ++++------
 .../seatunnel/spark/redis/source/Redis.scala       | 19 ++++------
 7 files changed, 93 insertions(+), 55 deletions(-)

diff --git a/docs/en/connector/sink/Redis.md b/docs/en/connector/sink/Redis.md
index 85e89061a..b8860b36b 100644
--- a/docs/en/connector/sink/Redis.md
+++ b/docs/en/connector/sink/Redis.md
@@ -30,6 +30,7 @@ Engine Supported and plugin name
 | zset_name | string | no       |               |
 | timeout   | int    | no       | 2000          |
 | ttl       | int    | no       | 0             |
+| is_self_achieved    | boolean | no       | false         |
 
 ### host [string]
 
@@ -75,6 +76,10 @@ if redis data type is SET must config set name
 
 redis data expiration ttl, 0 means no expiration.
 
+### is_self_achieved [boolean]
+
+If a redis access by a self-achieved redis proxy, which is not support redis function of "info Replication"
+
 ## Examples
 
 ```bash
@@ -85,5 +90,6 @@ redis {
   db_num = 1
   data_type = "HASH"
   hash_name = "test"
+  is_self_achieved = false
 }
 ```
diff --git a/docs/en/connector/source/Redis.md b/docs/en/connector/source/Redis.md
index 35dfff5b1..026fbfb11 100644
--- a/docs/en/connector/source/Redis.md
+++ b/docs/en/connector/source/Redis.md
@@ -17,17 +17,18 @@ Engine Supported and plugin name
 
 ## Options
 
-| name                | type     | required | default value |
-|---------------------|----------|----------|---------------|
-| host                | string   | no       | "localhost"   |
-| port                | int      | no       | 6379          |
-| auth                | string   | no       |               |
-| db_num              | int      | no       | 0             |
-| keys_or_key_pattern | string   | yes      |               |
-| partition_num       | int      | no       | 3             |
-| data_type           | string   | no       | "KV"          |
-| timeout             | int      | no       | 2000          |
-| common-options      | string   | yes      |               |
+| name                | type    | required | default value |
+|---------------------|---------|----------|---------------|
+| host                | string  | no       | "localhost"   |
+| port                | int     | no       | 6379          |
+| auth                | string  | no       |               |
+| db_num              | int     | no       | 0             |
+| keys_or_key_pattern | string  | yes      |               |
+| partition_num       | int     | no       | 3             |
+| data_type           | string  | no       | "KV"          |
+| timeout             | int     | no       | 2000          |
+| common-options      | string  | yes      |               |
+| is_self_achieved    | boolean | no       | false         |
 
 ### host [string]
 
@@ -65,6 +66,10 @@ Redis timeout
 
 Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) for details
 
+### is_self_achieved [boolean]
+
+If a redis access by a self-achieved redis proxy, which is not support redis function of "info Replication"
+
 ## Example
 
 ```bash
@@ -77,6 +82,7 @@ redis {
   partition_num = 20
   data_type = "HASH"
   result_table_name = "hash_result_table"
+  is_self_achieved = false
 }
 ```
 
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
index 9e09e55ac..e3f7e3762 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
@@ -32,6 +32,7 @@ object Constants extends Serializable {
   final val ZSET_NAME = "zset_name"
   final val LIST_NAME = "list_name"
   final val TTL = "ttl"
+  final val IS_SELF_ACHIEVED_REDIS = "is_self_achieved"
 
   final val DEFAULT_HOST = "localhost"
   final val DEFAULT_PORT = 6379
@@ -41,5 +42,6 @@ object Constants extends Serializable {
   final val DEFAULT_PARTITION_NUM = 3
   final val DEFAULT_TIMEOUT = 2000
   final val DEFAULT_TTL = 0
+  final val DEFAULT_IS_SELF_ACHIEVED_REDIS = false
 
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala
new file mode 100644
index 000000000..360c205b1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.redis.common
+
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint}
+import org.apache.seatunnel.shade.com.typesafe.config.Config
+import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DB_NUM, HOST, PORT, TIMEOUT}
+
+object RedisUtil {
+  def getRedisConfig(isSelfAchieved: Boolean, config: Config): RedisConfig = {
+    if (isSelfAchieved) {
+      new SelfAchievedRedisProxyConfig(RedisEndpoint(
+        host = config.getString(HOST),
+        port = config.getInt(PORT),
+        auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
+        dbNum = config.getInt(DB_NUM),
+        timeout = config.getInt(TIMEOUT)
+      ))
+    } else {
+      new RedisConfig(RedisEndpoint(
+        host = config.getString(HOST),
+        port = config.getInt(PORT),
+        auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
+        dbNum = config.getInt(DB_NUM),
+        timeout = config.getInt(TIMEOUT)
+      ))
+    }
+  }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
copy to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
index 9e09e55ac..abf65c093 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
@@ -17,29 +17,15 @@
 
 package org.apache.seatunnel.spark.redis.common
 
-object Constants extends Serializable {
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, RedisNode}
 
-  final val HOST = "host"
-  final val PORT = "port"
-  final val AUTH = "auth"
-  final val DB_NUM = "db_num"
-  final val KEYS_OR_KEY_PATTERN = "keys_or_key_pattern"
-  final val PARTITION_NUM = "partition_num"
-  final val TIMEOUT = "timeout"
-  final val DATA_TYPE = "data_type"
-  final val HASH_NAME = "hash_name"
-  final val SET_NAME = "set_name"
-  final val ZSET_NAME = "zset_name"
-  final val LIST_NAME = "list_name"
-  final val TTL = "ttl"
+class SelfAchievedRedisProxyConfig(initialHost: RedisEndpoint) extends RedisConfig(initialHost) {
+  override val initialAddr: String = initialHost.host
 
-  final val DEFAULT_HOST = "localhost"
-  final val DEFAULT_PORT = 6379
-  final val DEFAULT_AUTH = null
-  final val DEFAULT_DB_NUM = 0
-  final val DEFAULT_DATA_TYPE = "KV"
-  final val DEFAULT_PARTITION_NUM = 3
-  final val DEFAULT_TIMEOUT = 2000
-  final val DEFAULT_TTL = 0
+  override val hosts: Array[RedisNode] = getNodes(initialHost)
+  override val nodes: Array[RedisNode] = getNodes(initialHost)
 
+  override def getNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
+    Array(RedisNode(initialHost, 0, 16383, 0, 1))
+  }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
index 44ac09db8..d53887b08 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.spark.redis.sink
 
-import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import com.redislabs.provider.redis.{RedisConfig, toRedisContext}
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, DEFAULT_TTL, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME, TTL}
-import org.apache.seatunnel.spark.redis.common.RedisDataType
+import org.apache.seatunnel.spark.redis.common.Constants._
+import org.apache.seatunnel.spark.redis.common.{RedisDataType, RedisUtil}
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Row}
@@ -35,13 +35,9 @@ class Redis extends SparkBatchSink with Logging {
   var redisDataType: RedisDataType.Value = _
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
-    val redisConfigs = new RedisConfig(RedisEndpoint(
-      host = config.getString(HOST),
-      port = config.getInt(PORT),
-      auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
-      dbNum = config.getInt(DB_NUM),
-      timeout = config.getInt(TIMEOUT)
-    ))
+    val isSelfAchieved = if (config.getIsNull(IS_SELF_ACHIEVED_REDIS)) false else config.getBoolean(IS_SELF_ACHIEVED_REDIS)
+
+    val redisConfigs = RedisUtil.getRedisConfig(isSelfAchieved, config)
 
     redisDataType = RedisDataType.withName(config.getString(DATA_TYPE))
     implicit val sc: SparkContext = env.getSparkSession.sparkContext
@@ -79,7 +75,8 @@ class Redis extends SparkBatchSink with Logging {
         DB_NUM -> DEFAULT_DB_NUM,
         DATA_TYPE -> DEFAULT_DATA_TYPE,
         TIMEOUT -> DEFAULT_TIMEOUT,
-        TTL -> DEFAULT_TTL
+        TTL -> DEFAULT_TTL,
+        IS_SELF_ACHIEVED_REDIS -> DEFAULT_IS_SELF_ACHIEVED_REDIS
       ))
     config = config.withFallback(defaultConfig)
   }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
index 2cf37b9a1..14f1503aa 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.spark.redis.source
 
-import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import com.redislabs.provider.redis.{RedisConfig, toRedisContext}
 import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PARTITION_NUM, DEFAULT_PORT, DEFAULT_TIMEOUT, HOST, KEYS_OR_KEY_PATTERN, PARTITION_NUM, PORT, TIMEOUT}
-import org.apache.seatunnel.spark.redis.common.RedisDataType
+import org.apache.seatunnel.spark.redis.common.Constants._
+import org.apache.seatunnel.spark.redis.common.{RedisDataType, RedisUtil}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Row}
@@ -52,7 +52,8 @@ class Redis extends SparkBatchSource {
         DB_NUM -> DEFAULT_DB_NUM,
         DATA_TYPE -> DEFAULT_DATA_TYPE,
         PARTITION_NUM -> DEFAULT_PARTITION_NUM,
-        TIMEOUT -> DEFAULT_TIMEOUT
+        TIMEOUT -> DEFAULT_TIMEOUT,
+        IS_SELF_ACHIEVED_REDIS -> DEFAULT_IS_SELF_ACHIEVED_REDIS
       ))
     config = config.withFallback(defaultConfig)
   }
@@ -65,13 +66,9 @@ class Redis extends SparkBatchSource {
    */
   override def getData(env: SparkEnvironment): Dataset[Row] = {
     // Get data from redis through keys and combine it into a dataset
-    val redisConfigs = new RedisConfig(RedisEndpoint(
-      host = config.getString(HOST),
-      port = config.getInt(PORT),
-      auth = config.getString(AUTH),
-      dbNum = config.getInt(DB_NUM),
-      timeout = config.getInt(TIMEOUT)
-    ))
+    val isSelfAchieved = if (config.getIsNull(IS_SELF_ACHIEVED_REDIS)) false else config.getBoolean(IS_SELF_ACHIEVED_REDIS)
+
+    val redisConfigs = RedisUtil.getRedisConfig(isSelfAchieved, config)
 
     redisDataType = RedisDataType.withName(config.getString(DATA_TYPE).toUpperCase)
     val keysOrKeyPattern = config.getString(KEYS_OR_KEY_PATTERN)