You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/16 02:10:18 UTC
[incubator-seatunnel] branch dev updated: [Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 45c542c95 [Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)
45c542c95 is described below
commit 45c542c95ebec1f28aa059dc1d1a9462caeacae4
Author: 巧克力黑 <ja...@aliyun.com>
AuthorDate: Tue Aug 16 10:10:12 2022 +0800
[Connector-v1]Self-Achieved Redis Proxy which is not support redis function of "info replication" (#2389)
* self achieved redis proxy
---
docs/en/connector/sink/Redis.md | 6 +++
docs/en/connector/source/Redis.md | 28 ++++++++------
.../seatunnel/spark/redis/common/Constants.scala | 2 +
.../seatunnel/spark/redis/common/RedisUtil.scala | 44 ++++++++++++++++++++++
...ts.scala => SelfAchievedRedisProxyConfig.scala} | 30 ++++-----------
.../apache/seatunnel/spark/redis/sink/Redis.scala | 19 ++++------
.../seatunnel/spark/redis/source/Redis.scala | 19 ++++------
7 files changed, 93 insertions(+), 55 deletions(-)
diff --git a/docs/en/connector/sink/Redis.md b/docs/en/connector/sink/Redis.md
index 85e89061a..b8860b36b 100644
--- a/docs/en/connector/sink/Redis.md
+++ b/docs/en/connector/sink/Redis.md
@@ -30,6 +30,7 @@ Engine Supported and plugin name
| zset_name | string | no | |
| timeout | int | no | 2000 |
| ttl | int | no | 0 |
+| is_self_achieved | boolean | no | false |
### host [string]
@@ -75,6 +76,10 @@ if redis data type is SET must config set name
redis data expiration ttl, 0 means no expiration.
+### is_self_achieved [boolean]
+
+If a redis access by a self-achieved redis proxy, which is not support redis function of "info Replication"
+
## Examples
```bash
@@ -85,5 +90,6 @@ redis {
db_num = 1
data_type = "HASH"
hash_name = "test"
+ is_self_achieved = false
}
```
diff --git a/docs/en/connector/source/Redis.md b/docs/en/connector/source/Redis.md
index 35dfff5b1..026fbfb11 100644
--- a/docs/en/connector/source/Redis.md
+++ b/docs/en/connector/source/Redis.md
@@ -17,17 +17,18 @@ Engine Supported and plugin name
## Options
-| name | type | required | default value |
-|---------------------|----------|----------|---------------|
-| host | string | no | "localhost" |
-| port | int | no | 6379 |
-| auth | string | no | |
-| db_num | int | no | 0 |
-| keys_or_key_pattern | string | yes | |
-| partition_num | int | no | 3 |
-| data_type | string | no | "KV" |
-| timeout | int | no | 2000 |
-| common-options | string | yes | |
+| name | type | required | default value |
+|---------------------|---------|----------|---------------|
+| host | string | no | "localhost" |
+| port | int | no | 6379 |
+| auth | string | no | |
+| db_num | int | no | 0 |
+| keys_or_key_pattern | string | yes | |
+| partition_num | int | no | 3 |
+| data_type | string | no | "KV" |
+| timeout | int | no | 2000 |
+| common-options | string | yes | |
+| is_self_achieved | boolean | no | false |
### host [string]
@@ -65,6 +66,10 @@ Redis timeout
Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) for details
+### is_self_achieved [boolean]
+
+If a redis access by a self-achieved redis proxy, which is not support redis function of "info Replication"
+
## Example
```bash
@@ -77,6 +82,7 @@ redis {
partition_num = 20
data_type = "HASH"
result_table_name = "hash_result_table"
+ is_self_achieved = false
}
```
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
index 9e09e55ac..e3f7e3762 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
@@ -32,6 +32,7 @@ object Constants extends Serializable {
final val ZSET_NAME = "zset_name"
final val LIST_NAME = "list_name"
final val TTL = "ttl"
+ final val IS_SELF_ACHIEVED_REDIS = "is_self_achieved"
final val DEFAULT_HOST = "localhost"
final val DEFAULT_PORT = 6379
@@ -41,5 +42,6 @@ object Constants extends Serializable {
final val DEFAULT_PARTITION_NUM = 3
final val DEFAULT_TIMEOUT = 2000
final val DEFAULT_TTL = 0
+ final val DEFAULT_IS_SELF_ACHIEVED_REDIS = false
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala
new file mode 100644
index 000000000..360c205b1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/RedisUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.redis.common
+
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint}
+import org.apache.seatunnel.shade.com.typesafe.config.Config
+import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DB_NUM, HOST, PORT, TIMEOUT}
+
+object RedisUtil {
+ def getRedisConfig(isSelfAchieved: Boolean, config: Config): RedisConfig = {
+ if (isSelfAchieved) {
+ new SelfAchievedRedisProxyConfig(RedisEndpoint(
+ host = config.getString(HOST),
+ port = config.getInt(PORT),
+ auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
+ dbNum = config.getInt(DB_NUM),
+ timeout = config.getInt(TIMEOUT)
+ ))
+ } else {
+ new RedisConfig(RedisEndpoint(
+ host = config.getString(HOST),
+ port = config.getInt(PORT),
+ auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
+ dbNum = config.getInt(DB_NUM),
+ timeout = config.getInt(TIMEOUT)
+ ))
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
similarity index 53%
copy from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
copy to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
index 9e09e55ac..abf65c093 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/Constants.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/common/SelfAchievedRedisProxyConfig.scala
@@ -17,29 +17,15 @@
package org.apache.seatunnel.spark.redis.common
-object Constants extends Serializable {
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, RedisNode}
- final val HOST = "host"
- final val PORT = "port"
- final val AUTH = "auth"
- final val DB_NUM = "db_num"
- final val KEYS_OR_KEY_PATTERN = "keys_or_key_pattern"
- final val PARTITION_NUM = "partition_num"
- final val TIMEOUT = "timeout"
- final val DATA_TYPE = "data_type"
- final val HASH_NAME = "hash_name"
- final val SET_NAME = "set_name"
- final val ZSET_NAME = "zset_name"
- final val LIST_NAME = "list_name"
- final val TTL = "ttl"
+class SelfAchievedRedisProxyConfig(initialHost: RedisEndpoint) extends RedisConfig(initialHost) {
+ override val initialAddr: String = initialHost.host
- final val DEFAULT_HOST = "localhost"
- final val DEFAULT_PORT = 6379
- final val DEFAULT_AUTH = null
- final val DEFAULT_DB_NUM = 0
- final val DEFAULT_DATA_TYPE = "KV"
- final val DEFAULT_PARTITION_NUM = 3
- final val DEFAULT_TIMEOUT = 2000
- final val DEFAULT_TTL = 0
+ override val hosts: Array[RedisNode] = getNodes(initialHost)
+ override val nodes: Array[RedisNode] = getNodes(initialHost)
+ override def getNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
+ Array(RedisNode(initialHost, 0, 16383, 0, 1))
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
index 44ac09db8..d53887b08 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
@@ -17,13 +17,13 @@
package org.apache.seatunnel.spark.redis.sink
-import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import com.redislabs.provider.redis.{RedisConfig, toRedisContext}
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, DEFAULT_TTL, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME, TTL}
-import org.apache.seatunnel.spark.redis.common.RedisDataType
+import org.apache.seatunnel.spark.redis.common.Constants._
+import org.apache.seatunnel.spark.redis.common.{RedisDataType, RedisUtil}
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row}
@@ -35,13 +35,9 @@ class Redis extends SparkBatchSink with Logging {
var redisDataType: RedisDataType.Value = _
override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
- val redisConfigs = new RedisConfig(RedisEndpoint(
- host = config.getString(HOST),
- port = config.getInt(PORT),
- auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
- dbNum = config.getInt(DB_NUM),
- timeout = config.getInt(TIMEOUT)
- ))
+ val isSelfAchieved = if (config.getIsNull(IS_SELF_ACHIEVED_REDIS)) false else config.getBoolean(IS_SELF_ACHIEVED_REDIS)
+
+ val redisConfigs = RedisUtil.getRedisConfig(isSelfAchieved, config)
redisDataType = RedisDataType.withName(config.getString(DATA_TYPE))
implicit val sc: SparkContext = env.getSparkSession.sparkContext
@@ -79,7 +75,8 @@ class Redis extends SparkBatchSink with Logging {
DB_NUM -> DEFAULT_DB_NUM,
DATA_TYPE -> DEFAULT_DATA_TYPE,
TIMEOUT -> DEFAULT_TIMEOUT,
- TTL -> DEFAULT_TTL
+ TTL -> DEFAULT_TTL,
+ IS_SELF_ACHIEVED_REDIS -> DEFAULT_IS_SELF_ACHIEVED_REDIS
))
config = config.withFallback(defaultConfig)
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
index 2cf37b9a1..14f1503aa 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
@@ -17,13 +17,13 @@
package org.apache.seatunnel.spark.redis.source
-import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import com.redislabs.provider.redis.{RedisConfig, toRedisContext}
import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.seatunnel.spark.redis.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PARTITION_NUM, DEFAULT_PORT, DEFAULT_TIMEOUT, HOST, KEYS_OR_KEY_PATTERN, PARTITION_NUM, PORT, TIMEOUT}
-import org.apache.seatunnel.spark.redis.common.RedisDataType
+import org.apache.seatunnel.spark.redis.common.Constants._
+import org.apache.seatunnel.spark.redis.common.{RedisDataType, RedisUtil}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
@@ -52,7 +52,8 @@ class Redis extends SparkBatchSource {
DB_NUM -> DEFAULT_DB_NUM,
DATA_TYPE -> DEFAULT_DATA_TYPE,
PARTITION_NUM -> DEFAULT_PARTITION_NUM,
- TIMEOUT -> DEFAULT_TIMEOUT
+ TIMEOUT -> DEFAULT_TIMEOUT,
+ IS_SELF_ACHIEVED_REDIS -> DEFAULT_IS_SELF_ACHIEVED_REDIS
))
config = config.withFallback(defaultConfig)
}
@@ -65,13 +66,9 @@ class Redis extends SparkBatchSource {
*/
override def getData(env: SparkEnvironment): Dataset[Row] = {
// Get data from redis through keys and combine it into a dataset
- val redisConfigs = new RedisConfig(RedisEndpoint(
- host = config.getString(HOST),
- port = config.getInt(PORT),
- auth = config.getString(AUTH),
- dbNum = config.getInt(DB_NUM),
- timeout = config.getInt(TIMEOUT)
- ))
+ val isSelfAchieved = if (config.getIsNull(IS_SELF_ACHIEVED_REDIS)) false else config.getBoolean(IS_SELF_ACHIEVED_REDIS)
+
+ val redisConfigs = RedisUtil.getRedisConfig(isSelfAchieved, config)
redisDataType = RedisDataType.withName(config.getString(DATA_TYPE).toUpperCase)
val keysOrKeyPattern = config.getString(KEYS_OR_KEY_PATTERN)