You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/31 06:09:19 UTC

[inlong] branch master updated: [INLONG-7240][Sort] Support load node of Redis (#7241)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab88b914e [INLONG-7240][Sort] Support load node of Redis (#7241)
ab88b914e is described below

commit ab88b914e884740bdad425d4985d6134a54d0804
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 31 14:09:12 2023 +0800

    [INLONG-7240][Sort] Support load node of Redis (#7241)
---
 .../apache/inlong/sort/protocol/node/LoadNode.java |   2 +
 .../org/apache/inlong/sort/protocol/node/Node.java |   2 +
 .../sort/protocol/node/load/RedisLoadNode.java     | 249 +++++++++++++++++++++
 .../inlong/sort/parser/RedisNodeSqlParserTest.java | 156 +++++++++++++
 4 files changed, 409 insertions(+)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 8bb75e7e6..cb8d5a1f5 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -73,6 +74,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"),
         @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
         @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
+        @JsonSubTypes.Type(value = RedisLoadNode.class, name = "redisLoad"),
 })
 @NoArgsConstructor
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 4b70b1ac3..f1ec14169 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -48,6 +48,7 @@ import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -95,6 +96,7 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
         @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
         @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"),
+        @JsonSubTypes.Type(value = RedisLoadNode.class, name = "redisLoad"),
         @JsonSubTypes.Type(value = KuduLoadNode.class, name = "kuduLoad"),
 })
 public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java
new file mode 100644
index 000000000..2ee12a9c7
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The load node of redis.
+ */
+@JsonTypeName("redisLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class RedisLoadNode extends LoadNode implements InlongMetric, Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    public static final String ENABLE_CODE = "true";
+
+    private static final String EXTEND_ATTR_KEY_NAME = "keyName";
+    private static final String EXTEND_ATTR_VALUE_NAME = "keyValue";
+    public static final String DATA_TYPE = "data-type";
+    public static final String REDIS_MODE = "redis-mode";
+    public static final String SCHEMA_MAPPING_MODE = "schema-mapping-mode";
+    public static final String CLUSTER_MODE_STANDALONE = "standalone";
+    public static final String VALUE_HOST = "host";
+    public static final String VALUE_PORT = "port";
+    public static final String PASSWORD = "password";
+    public static final String CLUSTER_MODE_CLUSTER = "cluster";
+    public static final String CLUSTER_NODES = "cluster-nodes";
+    public static final String CLUSTER_PASSWORD = "cluster.password";
+    public static final String MASTER_NAME = "master.name";
+    public static final String SENTINELS_INFO = "sentinels.info";
+    public static final String SENTINELS_PASSWORD = "sentinels.password";
+    public static final String DATABASE = "database";
+    public static final String MAX_IDLE = "maxIdle";
+    public static final String SINK_MAX_RETRIES = "sink.max-retries";
+    public static final String MAX_TOTAL = "maxTotal";
+    public static final String MIN_IDLE = "minIdle";
+    public static final String SO_TIMEOUT = "soTimeout";
+    public static final String TIMEOUT = "timeout";
+    public static final String EXPIRE_TIME = "expire-time";
+    public static final String CONNECTOR_KEY = "connector";
+    public static final String CONNECTOR_REDIS_INLONG = "redis-inlong";
+    private Format format;
+
+    @JsonProperty("extList")
+    private List<HashMap<String, String>> extList;
+
+    private String clusterMode;
+    private String dataType;
+    private String schemaMapMode;
+    private String host;
+    private Integer port;
+    private String clusterNodes;
+    private String sentinelMasterName;
+    private String sentinelsInfo;
+    private Integer database;
+    private String password;
+    private Integer ttl;
+    private Integer timeout;
+    private Integer soTimeout;
+    private Integer maxTotal;
+    private Integer maxIdle;
+    private Integer minIdle;
+    private Integer maxRetries;
+
+    /**
+     * The prefix of ddl attr parsed from frontend advanced properties.
+     */
+    public static final String DDL_ATTR_PREFIX = "ddl.";
+
+    @JsonCreator
+    public RedisLoadNode(
+            @JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("extList") List<HashMap<String, String>> extList,
+            @JsonProperty("clusterMode") String clusterMode,
+            @JsonProperty("dataType") String dataType,
+            @JsonProperty("schemaMapMode") String schemaMapMode,
+            @Nullable @JsonProperty("host") String host,
+            @Nullable @JsonProperty("port") Integer port,
+            @Nullable @JsonProperty("clusterNodes") String clusterNodes,
+            @Nullable @JsonProperty("sentinelMasterName") String sentinelMasterName,
+            @Nullable @JsonProperty("sentinelsInfo") String sentinelsInfo,
+            @Nullable @JsonProperty("database") Integer database,
+            @Nullable @JsonProperty("password") String password,
+            @Nullable @JsonProperty("ttl") Integer ttl,
+            @Nonnull @JsonProperty("format") Format format,
+            @Nullable @JsonProperty("timeout") Integer timeout,
+            @Nullable @JsonProperty("soTimeout") Integer soTimeout,
+            @Nullable @JsonProperty("maxTotal") Integer maxTotal,
+            @Nullable @JsonProperty("maxIdle") Integer maxIdle,
+            @Nullable @JsonProperty("minIdle") Integer minIdle,
+            @Nullable @JsonProperty("maxRetries") Integer maxRetries) {
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
+
+        this.extList = extList;
+        this.clusterMode = clusterMode;
+        this.dataType = dataType;
+        this.schemaMapMode = schemaMapMode;
+        this.host = host;
+        this.port = port;
+        this.clusterNodes = clusterNodes;
+        this.sentinelMasterName = sentinelMasterName;
+        this.sentinelsInfo = sentinelsInfo;
+        this.database = database;
+        this.password = password;
+        this.ttl = ttl;
+
+        this.format = Preconditions.checkNotNull(format, "format is null");
+
+        this.timeout = timeout;
+        this.soTimeout = soTimeout;
+        this.maxTotal = maxTotal;
+        this.maxIdle = maxIdle;
+        this.minIdle = minIdle;
+        this.maxRetries = maxRetries;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+
+        options.put(DATA_TYPE, dataType);
+        options.put(REDIS_MODE, clusterMode);
+        options.put(SCHEMA_MAPPING_MODE, schemaMapMode);
+        if (CLUSTER_MODE_STANDALONE.equals(clusterMode)) {
+            options.put(VALUE_HOST, host);
+            options.put(VALUE_PORT, String.valueOf(port));
+            if (StringUtils.isNotBlank(password)) {
+                options.put(PASSWORD, password);
+            }
+        } else if (CLUSTER_MODE_CLUSTER.equals(clusterMode)) {
+            options.put(CLUSTER_NODES, clusterNodes);
+            if (StringUtils.isNotBlank(password)) {
+                options.put(CLUSTER_PASSWORD, password);
+            }
+        } else {
+            options.put(MASTER_NAME, sentinelMasterName);
+            options.put(SENTINELS_INFO, sentinelsInfo);
+            if (StringUtils.isNotBlank(password)) {
+                options.put(SENTINELS_PASSWORD, password);
+            }
+        }
+        if (database != null) {
+            options.put(DATABASE, String.valueOf(database));
+        }
+        if (maxIdle != null) {
+            options.put(MAX_IDLE, String.valueOf(maxIdle));
+        }
+        if (maxRetries != null) {
+            options.put(SINK_MAX_RETRIES, String.valueOf(maxRetries));
+        }
+
+        if (maxTotal != null) {
+            options.put(MAX_TOTAL, String.valueOf(maxTotal));
+        }
+        if (minIdle != null) {
+            options.put(MIN_IDLE, String.valueOf(minIdle));
+        }
+        if (soTimeout != null) {
+            options.put(SO_TIMEOUT, String.valueOf(soTimeout));
+        }
+        if (timeout != null) {
+            options.put(TIMEOUT, String.valueOf(timeout));
+        }
+        if (ttl != null) {
+            options.put(EXPIRE_TIME, ttl + "s");
+        }
+
+        options.putAll(format.generateOptions(false));
+
+        // If the extend attributes starts with .ddl,
+        // it will be passed to the ddl statement of the table
+        if (extList != null) {
+            extList.forEach(ext -> {
+                String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+                if (StringUtils.isNoneBlank(keyName) && keyName.startsWith(DDL_ATTR_PREFIX)) {
+                    String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length());
+                    String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+                    options.put(ddlKeyName, ddlValue);
+                }
+            });
+        }
+
+        options.put(CONNECTOR_KEY, CONNECTOR_REDIS_INLONG);
+
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return getName();
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return null;
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return super.getPartitionFields();
+    }
+
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
new file mode 100644
index 000000000..f5ea25b08
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Redis SQL parser.
+ */
+public class RedisNodeSqlParserTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()),
+                new FieldInfo("event_type", new StringFormatInfo()));
+        // if you hope hive load mode of append, please add this config
+        Map<String, String> map = new HashMap<>();
+        map.put("append-mode", "true");
+        return new MySqlExtractNode(id, "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("work1"), "localhost", "root", "123456",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private RedisLoadNode buildRedisLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo())));
+        List<HashMap<String, String>> extList = new ArrayList<>();
+        HashMap<String, String> map = new HashMap<>();
+        extList.add(map);
+
+        CsvFormat format = new CsvFormat();
+
+        return new RedisLoadNode(
+                "redis_table_name",
+                "redis_table_name",
+                fields,
+                relations,
+                null,
+                null,
+                null,
+                null,
+                null,
+                "standalone",
+                "HASH",
+                "STATIC_PREFIX_MATCH",
+                "127.0.0.1",
+                6379,
+                null,
+                null,
+                null,
+                0,
+                null,
+                0,
+                format,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null);
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    @Test
+    public void testRedis() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildRedisLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+        Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty());
+    }
+}