You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ic...@apache.org on 2022/10/31 10:19:53 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)

This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c7275a49c [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
c7275a49c is described below

commit c7275a49cc7434122d3d16ac138f7715a781136b
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Oct 31 18:19:46 2022 +0800

    [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
    
    * [Feature][Connector-V2][Redis] Redis source & sink connector supports redis cluster mode connection and user authentication
    
    * [Feature][Connector-V2][Redis] Update docs
    
    * [Improve][Connector-V2][Redis] Support multi nodes setting in redis cluster mode
    
    * [Improve][Connector-V2][Redis] Support parse mode for hash keys
    
    * [Improve][Connector-V2][Redis] Update redis source doc
---
 docs/en/connector-v2/sink/Redis.md                 |  43 +++++---
 docs/en/connector-v2/source/Redis.md               | 110 +++++++++++++++++++--
 .../seatunnel/redis/config/JedisWrapper.java       |  89 +++++++++++++++++
 .../seatunnel/redis/config/RedisConfig.java        |  14 +++
 .../seatunnel/redis/config/RedisParameters.java    |  73 +++++++++++++-
 .../seatunnel/redis/source/RedisSourceReader.java  |  21 +++-
 6 files changed, 324 insertions(+), 26 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md
index 405e42484..9c0fa3e95 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -13,15 +13,18 @@ Used to write data to Redis.
 
 ##  Options
 
-| name          | type   | required | default value |
-|-------------- |--------|----------|---------------|
-| host          | string | yes      | -             |
-| port          | int    | yes      | -             |
-| key           | string | yes      | -             |
-| data_type     | string | yes      | -             |
-| auth          | string | no       | -             |
-| format        | string | no       | json          |
-| common-options|        | no       | -             |
+| name           | type   | required | default value |
+|----------------|--------|----------|---------------|
+| host           | string | yes      | -             |
+| port           | int    | yes      | -             |
+| key            | string | yes      | -             |
+| data_type      | string | yes      | -             |
+| user           | string | no       | -             |
+| auth           | string | no       | -             |
+| mode           | string | no       | -             |
+| auth           | list   | no       | -             |
+| format         | string | no       | json          |
+| common-options |        | no       | -             |
 
 ### host [string]
 
@@ -75,11 +78,25 @@ Redis data types, support `key` `hash` `list` `set` `zset`
 - zset
 > Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.
 
-### auth [String]
+### user [string]
+
+redis authentication user, you need it when you connect to an encrypted cluster
+
+### auth [string]
 
 Redis authentication password, you need it when you connect to an encrypted cluster
 
-### format [String]
+### mode [string]
+
+redis mode, `single` or `cluster`, default is `single`
+
+### nodes [list]
+
+redis nodes information, used in cluster mode, must like as the following format:
+
+[host1:port1, host2:port2]
+
+### format [string]
 
 The format of upstream data, now only support `json`, `text` will be supported later, default `json`.
 
@@ -121,3 +138,7 @@ simple:
 ### 2.2.0-beta 2022-09-26
 
 - Add Redis Sink Connector
+
+### next version
+
+- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md
index 974aa22aa..589efc01b 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,16 +17,20 @@ Used to read data from Redis.
 
 ##  Options
 
-| name           | type   | required | default value |
-|--------------- |--------|----------|---------------|
-| host           | string | yes      | -             |
-| port           | int    | yes      | -             |
-| keys           | string | yes      | -             |
-| data_type      | string | yes      | -             |
-| auth           | string | No       | -             |
-| schema         | config | No       | -             |
-| format         | string | No       | json          |
-| common-options |        | no       | -             |
+| name                | type   | required | default value |
+|---------------------|--------|----------|---------------|
+| host                | string | yes      | -             |
+| port                | int    | yes      | -             |
+| keys                | string | yes      | -             |
+| data_type           | string | yes      | -             |
+| user                | string | no       | -             |
+| auth                | string | no       | -             |
+| mode                | string | no       | -             |
+| hash_key_parse_mode | string | no       | all           |
+| nodes               | list   | no       | -             |
+| schema              | config | no       | -             |
+| format              | string | no       | json          |
+| common-options      |        | no       | -             |
 
 ### host [string]
 
@@ -36,6 +40,74 @@ redis host
 
 redis port
 
+### hash_key_parse_mode [string]
+
+hash key parse mode, support `all` `kv`, used to tell connector how to parse hash key.
+
+when setting it to `all`, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to `kv`, connector will treat each kv in hash key as a row and use the schema config to parse it:
+
+for example, if the value of hash key is the following shown:
+
+```text
+{ 
+  "001": {
+    "name": "tyrantlucifer",
+    "age": 26
+  },
+  "002": {
+    "name": "Zongwen",
+    "age": 26
+  }
+}
+
+```
+
+if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:
+
+```hocon
+
+schema {
+  fields {
+    001 {
+      name = string
+      age = int
+    }
+    002 {
+      name = string
+      age = int
+    }
+  }
+}
+
+```
+
+| 001                             | 002                       |
+|---------------------------------|---------------------------|
+| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
+
+if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:
+
+```hocon
+
+schema {
+  fields {
+    hash_key = string
+    name = string
+    age = int
+  }
+}
+
+```
+
+| hash_key | name          | age |
+|----------|---------------|-----|
+| 001      | tyrantlucifer | 26  |
+| 002      | Zongwen       | 26  |
+
+each kv that in hash key it will be treated as a row and send it to upstream.
+
+**Tips: connector will use the first field information of schema config as the field name of each k that in each kv**
+
 ### keys [string]
 
 keys pattern
@@ -67,10 +139,24 @@ redis data types, support `key` `hash` `list` `set` `zset`
 > Each element in the sorted set will be sent downstream as a single row of data
 > For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
 
+### user [string]
+
+redis authentication user, you need it when you connect to an encrypted cluster
+
 ### auth [string]
 
 redis authentication password, you need it when you connect to an encrypted cluster
 
+### mode [string]
+
+redis mode, `single` or `cluster`, default is `single`
+
+### nodes [list]
+
+redis nodes information, used in cluster mode, must like as the following format: 
+
+[host1:port1, host2:port2]
+
 ### format [string]
 
 the format of upstream data, now only support `json` `text`, default `json`.
@@ -166,3 +252,7 @@ simple:
 ### 2.2.0-beta 2022-09-26
 
 - Add Redis Source Connector
+
+### next version
+
+- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
new file mode 100644
index 000000000..8348f3561
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import lombok.NonNull;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JedisWrapper extends Jedis {
+    private final JedisCluster jedisCluster;
+
+    public JedisWrapper(@NonNull JedisCluster jedisCluster) {
+        this.jedisCluster = jedisCluster;
+    }
+
+    @Override
+    public String set(final String key, final String value) {
+        return jedisCluster.set(key, value);
+    }
+
+    @Override
+    public String get(final String key) {
+        return jedisCluster.get(key);
+    }
+
+    @Override
+    public long hset(final String key, final Map<String, String> hash) {
+        return jedisCluster.hset(key, hash);
+    }
+
+    @Override
+    public Map<String, String> hgetAll(final String key) {
+        return jedisCluster.hgetAll(key);
+    }
+
+    @Override
+    public long lpush(final String key, final String... strings) {
+        return jedisCluster.lpush(key, strings);
+    }
+
+    @Override
+    public List<String> lrange(final String key, final long start, final long stop) {
+        return jedisCluster.lrange(key, start, stop);
+    }
+
+    @Override
+    public long sadd(final String key, final String... members) {
+        return jedisCluster.sadd(key, members);
+    }
+
+    @Override
+    public Set<String> smembers(final String key) {
+        return jedisCluster.smembers(key);
+    }
+
+    @Override
+    public long zadd(final String key, final double score, final String member) {
+        return jedisCluster.zadd(key, score, member);
+    }
+
+    @Override
+    public List<String> zrange(final String key, final long start, final long stop) {
+        return jedisCluster.zrange(key, start, stop);
+    }
+
+    @Override
+    public void close() {
+        jedisCluster.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index e158449b4..11ee665a9 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -21,8 +21,22 @@ public class RedisConfig {
     public static final String HOST = "host";
     public static final String PORT = "port";
     public static final String AUTH = "auth";
+    public static final String USER = "user";
     public static final String KEY_PATTERN = "keys";
     public static final String KEY = "key";
     public static final String DATA_TYPE = "data_type";
     public static final String FORMAT = "format";
+    public static final String MODE = "mode";
+    public static final String NODES = "nodes";
+    public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";
+
+    public enum RedisMode {
+        SINGLE,
+        CLUSTER;
+    }
+
+    public enum HashKeyParseMode {
+        ALL,
+        KV;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index bc5147505..75b74d206 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -21,18 +21,28 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
+import redis.clients.jedis.ConnectionPoolConfig;
+import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 
 @Data
 public class RedisParameters implements Serializable {
     private String host;
     private int port;
     private String auth = "";
+    private String user = "";
     private String keysPattern;
     private String keyField;
     private RedisDataType redisDataType;
+    private RedisConfig.RedisMode mode;
+    private RedisConfig.HashKeyParseMode hashKeyParseMode;
+    private List<String> redisNodes = Collections.emptyList();
 
     public void buildWithConfig(Config config) {
         // set host
@@ -43,6 +53,27 @@ public class RedisParameters implements Serializable {
         if (config.hasPath(RedisConfig.AUTH)) {
             this.auth = config.getString(RedisConfig.AUTH);
         }
+        // set user
+        if (config.hasPath(RedisConfig.USER)) {
+            this.user = config.getString(RedisConfig.USER);
+        }
+        // set mode
+        if (config.hasPath(RedisConfig.MODE)) {
+            this.mode = RedisConfig.RedisMode.valueOf(config.getString(RedisConfig.MODE));
+        } else {
+            this.mode = RedisConfig.RedisMode.SINGLE;
+        }
+        // set hash key mode
+        if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE)) {
+            this.hashKeyParseMode = RedisConfig.HashKeyParseMode
+                    .valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE).toUpperCase());
+        } else {
+            this.hashKeyParseMode = RedisConfig.HashKeyParseMode.ALL;
+        }
+        // set redis nodes information
+        if (config.hasPath(RedisConfig.NODES)) {
+            this.redisNodes = config.getStringList(RedisConfig.NODES);
+        }
         // set key
         if (config.hasPath(RedisConfig.KEY)) {
             this.keyField = config.getString(RedisConfig.KEY);
@@ -61,10 +92,44 @@ public class RedisParameters implements Serializable {
     }
 
     public Jedis buildJedis() {
-        Jedis jedis = new Jedis(host, port);
-        if (StringUtils.isNotBlank(auth)) {
-            jedis.auth(auth);
+        switch (mode) {
+            case SINGLE:
+                Jedis jedis = new Jedis(host, port);
+                if (StringUtils.isNotBlank(auth)) {
+                    jedis.auth(auth);
+                }
+                if (StringUtils.isNotBlank(user)) {
+                    jedis.aclSetUser(user);
+                }
+                return jedis;
+            case CLUSTER:
+                HashSet<HostAndPort> nodes = new HashSet<>();
+                HostAndPort node = new HostAndPort(host, port);
+                nodes.add(node);
+                if (!redisNodes.isEmpty()) {
+                    for (String redisNode : redisNodes) {
+                        String[] splits = redisNode.split(":");
+                        if (splits.length != 2) {
+                            throw new IllegalArgumentException("Invalid redis node information," +
+                                    "redis node information must like as the following: [host:port]");
+                        }
+                        HostAndPort hostAndPort = new HostAndPort(splits[0], Integer.parseInt(splits[1]));
+                        nodes.add(hostAndPort);
+                    }
+                }
+                ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig();
+                JedisCluster jedisCluster;
+                if (StringUtils.isNotBlank(auth)) {
+                    jedisCluster = new JedisCluster(nodes, JedisCluster.DEFAULT_TIMEOUT,
+                            JedisCluster.DEFAULT_TIMEOUT, JedisCluster.DEFAULT_MAX_ATTEMPTS,
+                            auth, connectionPoolConfig);
+                } else {
+                    jedisCluster = new JedisCluster(nodes);
+                }
+                return new JedisWrapper(jedisCluster);
+            default:
+                // do nothing
+                throw new IllegalArgumentException("Not support this redis mode");
         }
-        return jedis;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index ada38fe6d..74f862ed8 100644
--- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -19,9 +19,13 @@ package org.apache.seatunnel.connectors.seatunnel.redis.source;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
@@ -29,6 +33,7 @@ import redis.clients.jedis.Jedis;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -66,7 +71,21 @@ public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
                 if (deserializationSchema == null) {
                     output.collect(new SeaTunnelRow(new Object[]{value}));
                 } else {
-                    deserializationSchema.deserialize(value.getBytes(), output);
+                    if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV &&
+                            redisDataType == RedisDataType.HASH) {
+                        // Treat each key-value pair in the hash-key as one piece of data
+                        Map<String, String> recordsMap = JsonUtils.toMap(value);
+                        for (Map.Entry<String, String> entry : recordsMap.entrySet()) {
+                            String k = entry.getKey();
+                            String v = entry.getValue();
+                            Map<String, String> valuesMap = JsonUtils.toMap(v);
+                            SeaTunnelDataType<SeaTunnelRow> seaTunnelRowType = deserializationSchema.getProducedType();
+                            valuesMap.put(((SeaTunnelRowType) seaTunnelRowType).getFieldName(0), k);
+                            deserializationSchema.deserialize(JsonUtils.toJsonString(valuesMap).getBytes(), output);
+                        }
+                    } else {
+                        deserializationSchema.deserialize(value.getBytes(), output);
+                    }
                 }
             }
         }