You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/31 06:09:19 UTC
[inlong] branch master updated: [INLONG-7240][Sort] Support load node of Redis (#7241)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab88b914e [INLONG-7240][Sort] Support load node of Redis (#7241)
ab88b914e is described below
commit ab88b914e884740bdad425d4985d6134a54d0804
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 31 14:09:12 2023 +0800
[INLONG-7240][Sort] Support load node of Redis (#7241)
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 2 +
.../org/apache/inlong/sort/protocol/node/Node.java | 2 +
.../sort/protocol/node/load/RedisLoadNode.java | 249 +++++++++++++++++++++
.../inlong/sort/parser/RedisNodeSqlParserTest.java | 156 +++++++++++++
4 files changed, 409 insertions(+)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 8bb75e7e6..cb8d5a1f5 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -73,6 +74,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"),
@JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
@JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
+ @JsonSubTypes.Type(value = RedisLoadNode.class, name = "redisLoad"),
})
@NoArgsConstructor
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 4b70b1ac3..f1ec14169 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -48,6 +48,7 @@ import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -95,6 +96,7 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
@JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
@JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"),
+ @JsonSubTypes.Type(value = RedisLoadNode.class, name = "redisLoad"),
@JsonSubTypes.Type(value = KuduLoadNode.class, name = "kuduLoad"),
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java
new file mode 100644
index 000000000..2ee12a9c7
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/RedisLoadNode.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The load node of redis.
+ */
+@JsonTypeName("redisLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class RedisLoadNode extends LoadNode implements InlongMetric, Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ public static final String ENABLE_CODE = "true";
+
+ private static final String EXTEND_ATTR_KEY_NAME = "keyName";
+ private static final String EXTEND_ATTR_VALUE_NAME = "keyValue";
+ public static final String DATA_TYPE = "data-type";
+ public static final String REDIS_MODE = "redis-mode";
+ public static final String SCHEMA_MAPPING_MODE = "schema-mapping-mode";
+ public static final String CLUSTER_MODE_STANDALONE = "standalone";
+ public static final String VALUE_HOST = "host";
+ public static final String VALUE_PORT = "port";
+ public static final String PASSWORD = "password";
+ public static final String CLUSTER_MODE_CLUSTER = "cluster";
+ public static final String CLUSTER_NODES = "cluster-nodes";
+ public static final String CLUSTER_PASSWORD = "cluster.password";
+ public static final String MASTER_NAME = "master.name";
+ public static final String SENTINELS_INFO = "sentinels.info";
+ public static final String SENTINELS_PASSWORD = "sentinels.password";
+ public static final String DATABASE = "database";
+ public static final String MAX_IDLE = "maxIdle";
+ public static final String SINK_MAX_RETRIES = "sink.max-retries";
+ public static final String MAX_TOTAL = "maxTotal";
+ public static final String MIN_IDLE = "minIdle";
+ public static final String SO_TIMEOUT = "soTimeout";
+ public static final String TIMEOUT = "timeout";
+ public static final String EXPIRE_TIME = "expire-time";
+ public static final String CONNECTOR_KEY = "connector";
+ public static final String CONNECTOR_REDIS_INLONG = "redis-inlong";
+ private Format format;
+
+ @JsonProperty("extList")
+ private List<HashMap<String, String>> extList;
+
+ private String clusterMode;
+ private String dataType;
+ private String schemaMapMode;
+ private String host;
+ private Integer port;
+ private String clusterNodes;
+ private String sentinelMasterName;
+ private String sentinelsInfo;
+ private Integer database;
+ private String password;
+ private Integer ttl;
+ private Integer timeout;
+ private Integer soTimeout;
+ private Integer maxTotal;
+ private Integer maxIdle;
+ private Integer minIdle;
+ private Integer maxRetries;
+
+ /**
+ * The prefix of ddl attr parsed from frontend advanced properties.
+ */
+ public static final String DDL_ATTR_PREFIX = "ddl.";
+
+ @JsonCreator
+ public RedisLoadNode(
+ @JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("extList") List<HashMap<String, String>> extList,
+ @JsonProperty("clusterMode") String clusterMode,
+ @JsonProperty("dataType") String dataType,
+ @JsonProperty("schemaMapMode") String schemaMapMode,
+ @Nullable @JsonProperty("host") String host,
+ @Nullable @JsonProperty("port") Integer port,
+ @Nullable @JsonProperty("clusterNodes") String clusterNodes,
+ @Nullable @JsonProperty("sentinelMasterName") String sentinelMasterName,
+ @Nullable @JsonProperty("sentinelsInfo") String sentinelsInfo,
+ @Nullable @JsonProperty("database") Integer database,
+ @Nullable @JsonProperty("password") String password,
+ @Nullable @JsonProperty("ttl") Integer ttl,
+ @Nonnull @JsonProperty("format") Format format,
+ @Nullable @JsonProperty("timeout") Integer timeout,
+ @Nullable @JsonProperty("soTimeout") Integer soTimeout,
+ @Nullable @JsonProperty("maxTotal") Integer maxTotal,
+ @Nullable @JsonProperty("maxIdle") Integer maxIdle,
+ @Nullable @JsonProperty("minIdle") Integer minIdle,
+ @Nullable @JsonProperty("maxRetries") Integer maxRetries) {
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
+
+ this.extList = extList;
+ this.clusterMode = clusterMode;
+ this.dataType = dataType;
+ this.schemaMapMode = schemaMapMode;
+ this.host = host;
+ this.port = port;
+ this.clusterNodes = clusterNodes;
+ this.sentinelMasterName = sentinelMasterName;
+ this.sentinelsInfo = sentinelsInfo;
+ this.database = database;
+ this.password = password;
+ this.ttl = ttl;
+
+ this.format = Preconditions.checkNotNull(format, "format is null");
+
+ this.timeout = timeout;
+ this.soTimeout = soTimeout;
+ this.maxTotal = maxTotal;
+ this.maxIdle = maxIdle;
+ this.minIdle = minIdle;
+ this.maxRetries = maxRetries;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+
+ options.put(DATA_TYPE, dataType);
+ options.put(REDIS_MODE, clusterMode);
+ options.put(SCHEMA_MAPPING_MODE, schemaMapMode);
+ if (CLUSTER_MODE_STANDALONE.equals(clusterMode)) {
+ options.put(VALUE_HOST, host);
+ options.put(VALUE_PORT, String.valueOf(port));
+ if (StringUtils.isNotBlank(password)) {
+ options.put(PASSWORD, password);
+ }
+ } else if (CLUSTER_MODE_CLUSTER.equals(clusterMode)) {
+ options.put(CLUSTER_NODES, clusterNodes);
+ if (StringUtils.isNotBlank(password)) {
+ options.put(CLUSTER_PASSWORD, password);
+ }
+ } else {
+ options.put(MASTER_NAME, sentinelMasterName);
+ options.put(SENTINELS_INFO, sentinelsInfo);
+ if (StringUtils.isNotBlank(password)) {
+ options.put(SENTINELS_PASSWORD, password);
+ }
+ }
+ if (database != null) {
+ options.put(DATABASE, String.valueOf(database));
+ }
+ if (maxIdle != null) {
+ options.put(MAX_IDLE, String.valueOf(maxIdle));
+ }
+ if (maxRetries != null) {
+ options.put(SINK_MAX_RETRIES, String.valueOf(maxRetries));
+ }
+
+ if (maxTotal != null) {
+ options.put(MAX_TOTAL, String.valueOf(maxTotal));
+ }
+ if (minIdle != null) {
+ options.put(MIN_IDLE, String.valueOf(minIdle));
+ }
+ if (soTimeout != null) {
+ options.put(SO_TIMEOUT, String.valueOf(soTimeout));
+ }
+ if (timeout != null) {
+ options.put(TIMEOUT, String.valueOf(timeout));
+ }
+ if (ttl != null) {
+ options.put(EXPIRE_TIME, ttl + "s");
+ }
+
+ options.putAll(format.generateOptions(false));
+
+ // If the extend attributes starts with .ddl,
+ // it will be passed to the ddl statement of the table
+ if (extList != null) {
+ extList.forEach(ext -> {
+ String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+ if (StringUtils.isNoneBlank(keyName) && keyName.startsWith(DDL_ATTR_PREFIX)) {
+ String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length());
+ String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+ options.put(ddlKeyName, ddlValue);
+ }
+ });
+ }
+
+ options.put(CONNECTOR_KEY, CONNECTOR_REDIS_INLONG);
+
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return getName();
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return null;
+ }
+
+ @Override
+ public List<FieldInfo> getPartitionFields() {
+ return super.getPartitionFields();
+ }
+
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
new file mode 100644
index 000000000..f5ea25b08
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Redis SQL parser.
+ */
+public class RedisNodeSqlParserTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("event_type", new StringFormatInfo()));
+ // if you hope hive load mode of append, please add this config
+ Map<String, String> map = new HashMap<>();
+ map.put("append-mode", "true");
+ return new MySqlExtractNode(id, "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("work1"), "localhost", "root", "123456",
+ "inlong", null, null,
+ null, null);
+ }
+
+ private RedisLoadNode buildRedisLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo())));
+ List<HashMap<String, String>> extList = new ArrayList<>();
+ HashMap<String, String> map = new HashMap<>();
+ extList.add(map);
+
+ CsvFormat format = new CsvFormat();
+
+ return new RedisLoadNode(
+ "redis_table_name",
+ "redis_table_name",
+ fields,
+ relations,
+ null,
+ null,
+ null,
+ null,
+ null,
+ "standalone",
+ "HASH",
+ "STATIC_PREFIX_MATCH",
+ "127.0.0.1",
+ 6379,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ 0,
+ format,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ @Test
+ public void testRedis() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode("1");
+ Node outputNode = buildRedisLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+ Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty());
+ }
+}