You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ic...@apache.org on 2022/10/31 10:19:53 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c7275a49c [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
c7275a49c is described below
commit c7275a49cc7434122d3d16ac138f7715a781136b
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Oct 31 18:19:46 2022 +0800
[Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
* [Feature][Connector-V2][Redis] Redis source & sink connector supports redis cluster mode connection and user authentication
* [Feature][Connector-V2][Redis] Update docs
* [Improve][Connector-V2][Redis] Support multi nodes setting in redis cluster mode
* [Improve][Connector-V2][Redis] Support parse mode for hash keys
* [Improve][Connector-V2][Redis] Update redis source doc
---
docs/en/connector-v2/sink/Redis.md | 43 +++++---
docs/en/connector-v2/source/Redis.md | 110 +++++++++++++++++++--
.../seatunnel/redis/config/JedisWrapper.java | 89 +++++++++++++++++
.../seatunnel/redis/config/RedisConfig.java | 14 +++
.../seatunnel/redis/config/RedisParameters.java | 73 +++++++++++++-
.../seatunnel/redis/source/RedisSourceReader.java | 21 +++-
6 files changed, 324 insertions(+), 26 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md
index 405e42484..9c0fa3e95 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -13,15 +13,18 @@ Used to write data to Redis.
## Options
-| name | type | required | default value |
-|-------------- |--------|----------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| key | string | yes | - |
-| data_type | string | yes | - |
-| auth | string | no | - |
-| format | string | no | json |
-| common-options| | no | - |
+| name | type | required | default value |
+|----------------|--------|----------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| key | string | yes | - |
+| data_type | string | yes | - |
+| user | string | no | - |
+| auth | string | no | - |
+| mode | string | no | - |
+| auth | list | no | - |
+| format | string | no | json |
+| common-options | | no | - |
### host [string]
@@ -75,11 +78,25 @@ Redis data types, support `key` `hash` `list` `set` `zset`
- zset
> Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.
-### auth [String]
+### user [string]
+
+redis authentication user, you need it when you connect to an encrypted cluster
+
+### auth [string]
Redis authentication password, you need it when you connect to an encrypted cluster
-### format [String]
+### mode [string]
+
+redis mode, `single` or `cluster`, default is `single`
+
+### nodes [list]
+
+redis nodes information, used in cluster mode, must like as the following format:
+
+[host1:port1, host2:port2]
+
+### format [string]
The format of upstream data, now only support `json`, `text` will be supported later, default `json`.
@@ -121,3 +138,7 @@ simple:
### 2.2.0-beta 2022-09-26
- Add Redis Sink Connector
+
+### next version
+
+- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md
index 974aa22aa..589efc01b 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,16 +17,20 @@ Used to read data from Redis.
## Options
-| name | type | required | default value |
-|--------------- |--------|----------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| keys | string | yes | - |
-| data_type | string | yes | - |
-| auth | string | No | - |
-| schema | config | No | - |
-| format | string | No | json |
-| common-options | | no | - |
+| name | type | required | default value |
+|---------------------|--------|----------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| keys | string | yes | - |
+| data_type | string | yes | - |
+| user | string | no | - |
+| auth | string | no | - |
+| mode | string | no | - |
+| hash_key_parse_mode | string | no | all |
+| nodes | list | no | - |
+| schema | config | no | - |
+| format | string | no | json |
+| common-options | | no | - |
### host [string]
@@ -36,6 +40,74 @@ redis host
redis port
+### hash_key_parse_mode [string]
+
+hash key parse mode, support `all` `kv`, used to tell connector how to parse hash key.
+
+when setting it to `all`, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to `kv`, connector will treat each kv in hash key as a row and use the schema config to parse it:
+
+for example, if the value of hash key is the following shown:
+
+```text
+{
+ "001": {
+ "name": "tyrantlucifer",
+ "age": 26
+ },
+ "002": {
+ "name": "Zongwen",
+ "age": 26
+ }
+}
+
+```
+
+if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:
+
+```hocon
+
+schema {
+ fields {
+ 001 {
+ name = string
+ age = int
+ }
+ 002 {
+ name = string
+ age = int
+ }
+ }
+}
+
+```
+
+| 001 | 002 |
+|---------------------------------|---------------------------|
+| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
+
+if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:
+
+```hocon
+
+schema {
+ fields {
+ hash_key = string
+ name = string
+ age = int
+ }
+}
+
+```
+
+| hash_key | name | age |
+|----------|---------------|-----|
+| 001 | tyrantlucifer | 26 |
+| 002 | Zongwen | 26 |
+
+each kv that in hash key it will be treated as a row and send it to upstream.
+
+**Tips: connector will use the first field information of schema config as the field name of each k that in each kv**
+
### keys [string]
keys pattern
@@ -67,10 +139,24 @@ redis data types, support `key` `hash` `list` `set` `zset`
> Each element in the sorted set will be sent downstream as a single row of data
> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
+### user [string]
+
+redis authentication user, you need it when you connect to an encrypted cluster
+
### auth [string]
redis authentication password, you need it when you connect to an encrypted cluster
+### mode [string]
+
+redis mode, `single` or `cluster`, default is `single`
+
+### nodes [list]
+
+redis nodes information, used in cluster mode, must like as the following format:
+
+[host1:port1, host2:port2]
+
### format [string]
the format of upstream data, now only support `json` `text`, default `json`.
@@ -166,3 +252,7 @@ simple:
### 2.2.0-beta 2022-09-26
- Add Redis Source Connector
+
+### next version
+
+- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
new file mode 100644
index 000000000..8348f3561
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import lombok.NonNull;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JedisWrapper extends Jedis {
+ private final JedisCluster jedisCluster;
+
+ public JedisWrapper(@NonNull JedisCluster jedisCluster) {
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public String set(final String key, final String value) {
+ return jedisCluster.set(key, value);
+ }
+
+ @Override
+ public String get(final String key) {
+ return jedisCluster.get(key);
+ }
+
+ @Override
+ public long hset(final String key, final Map<String, String> hash) {
+ return jedisCluster.hset(key, hash);
+ }
+
+ @Override
+ public Map<String, String> hgetAll(final String key) {
+ return jedisCluster.hgetAll(key);
+ }
+
+ @Override
+ public long lpush(final String key, final String... strings) {
+ return jedisCluster.lpush(key, strings);
+ }
+
+ @Override
+ public List<String> lrange(final String key, final long start, final long stop) {
+ return jedisCluster.lrange(key, start, stop);
+ }
+
+ @Override
+ public long sadd(final String key, final String... members) {
+ return jedisCluster.sadd(key, members);
+ }
+
+ @Override
+ public Set<String> smembers(final String key) {
+ return jedisCluster.smembers(key);
+ }
+
+ @Override
+ public long zadd(final String key, final double score, final String member) {
+ return jedisCluster.zadd(key, score, member);
+ }
+
+ @Override
+ public List<String> zrange(final String key, final long start, final long stop) {
+ return jedisCluster.zrange(key, start, stop);
+ }
+
+ @Override
+ public void close() {
+ jedisCluster.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index e158449b4..11ee665a9 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -21,8 +21,22 @@ public class RedisConfig {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String AUTH = "auth";
+ public static final String USER = "user";
public static final String KEY_PATTERN = "keys";
public static final String KEY = "key";
public static final String DATA_TYPE = "data_type";
public static final String FORMAT = "format";
+ public static final String MODE = "mode";
+ public static final String NODES = "nodes";
+ public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";
+
+ public enum RedisMode {
+ SINGLE,
+ CLUSTER;
+ }
+
+ public enum HashKeyParseMode {
+ ALL,
+ KV;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index bc5147505..75b74d206 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -21,18 +21,28 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
+import redis.clients.jedis.ConnectionPoolConfig;
+import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
@Data
public class RedisParameters implements Serializable {
private String host;
private int port;
private String auth = "";
+ private String user = "";
private String keysPattern;
private String keyField;
private RedisDataType redisDataType;
+ private RedisConfig.RedisMode mode;
+ private RedisConfig.HashKeyParseMode hashKeyParseMode;
+ private List<String> redisNodes = Collections.emptyList();
public void buildWithConfig(Config config) {
// set host
@@ -43,6 +53,27 @@ public class RedisParameters implements Serializable {
if (config.hasPath(RedisConfig.AUTH)) {
this.auth = config.getString(RedisConfig.AUTH);
}
+ // set user
+ if (config.hasPath(RedisConfig.USER)) {
+ this.user = config.getString(RedisConfig.USER);
+ }
+ // set mode
+ if (config.hasPath(RedisConfig.MODE)) {
+ this.mode = RedisConfig.RedisMode.valueOf(config.getString(RedisConfig.MODE));
+ } else {
+ this.mode = RedisConfig.RedisMode.SINGLE;
+ }
+ // set hash key mode
+ if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE)) {
+ this.hashKeyParseMode = RedisConfig.HashKeyParseMode
+ .valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE).toUpperCase());
+ } else {
+ this.hashKeyParseMode = RedisConfig.HashKeyParseMode.ALL;
+ }
+ // set redis nodes information
+ if (config.hasPath(RedisConfig.NODES)) {
+ this.redisNodes = config.getStringList(RedisConfig.NODES);
+ }
// set key
if (config.hasPath(RedisConfig.KEY)) {
this.keyField = config.getString(RedisConfig.KEY);
@@ -61,10 +92,44 @@ public class RedisParameters implements Serializable {
}
public Jedis buildJedis() {
- Jedis jedis = new Jedis(host, port);
- if (StringUtils.isNotBlank(auth)) {
- jedis.auth(auth);
+ switch (mode) {
+ case SINGLE:
+ Jedis jedis = new Jedis(host, port);
+ if (StringUtils.isNotBlank(auth)) {
+ jedis.auth(auth);
+ }
+ if (StringUtils.isNotBlank(user)) {
+ jedis.aclSetUser(user);
+ }
+ return jedis;
+ case CLUSTER:
+ HashSet<HostAndPort> nodes = new HashSet<>();
+ HostAndPort node = new HostAndPort(host, port);
+ nodes.add(node);
+ if (!redisNodes.isEmpty()) {
+ for (String redisNode : redisNodes) {
+ String[] splits = redisNode.split(":");
+ if (splits.length != 2) {
+ throw new IllegalArgumentException("Invalid redis node information," +
+ "redis node information must like as the following: [host:port]");
+ }
+ HostAndPort hostAndPort = new HostAndPort(splits[0], Integer.parseInt(splits[1]));
+ nodes.add(hostAndPort);
+ }
+ }
+ ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig();
+ JedisCluster jedisCluster;
+ if (StringUtils.isNotBlank(auth)) {
+ jedisCluster = new JedisCluster(nodes, JedisCluster.DEFAULT_TIMEOUT,
+ JedisCluster.DEFAULT_TIMEOUT, JedisCluster.DEFAULT_MAX_ATTEMPTS,
+ auth, connectionPoolConfig);
+ } else {
+ jedisCluster = new JedisCluster(nodes);
+ }
+ return new JedisWrapper(jedisCluster);
+ default:
+ // do nothing
+ throw new IllegalArgumentException("Not support this redis mode");
}
- return jedis;
}
}
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index ada38fe6d..74f862ed8 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -19,9 +19,13 @@ package org.apache.seatunnel.connectors.seatunnel.redis.source;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
@@ -29,6 +33,7 @@ import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -66,7 +71,21 @@ public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
if (deserializationSchema == null) {
output.collect(new SeaTunnelRow(new Object[]{value}));
} else {
- deserializationSchema.deserialize(value.getBytes(), output);
+ if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV &&
+ redisDataType == RedisDataType.HASH) {
+ // Treat each key-value pair in the hash-key as one piece of data
+ Map<String, String> recordsMap = JsonUtils.toMap(value);
+ for (Map.Entry<String, String> entry : recordsMap.entrySet()) {
+ String k = entry.getKey();
+ String v = entry.getValue();
+ Map<String, String> valuesMap = JsonUtils.toMap(v);
+ SeaTunnelDataType<SeaTunnelRow> seaTunnelRowType = deserializationSchema.getProducedType();
+ valuesMap.put(((SeaTunnelRowType) seaTunnelRowType).getFieldName(0), k);
+ deserializationSchema.deserialize(JsonUtils.toJsonString(valuesMap).getBytes(), output);
+ }
+ } else {
+ deserializationSchema.deserialize(value.getBytes(), output);
+ }
}
}
}