You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/11 10:52:10 UTC

[inlong] branch master updated: [INLONG-4894][Sort] Add TubeMQ extract node (#4895)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c7e189de [INLONG-4894][Sort] Add TubeMQ extract node (#4895)
2c7e189de is described below

commit 2c7e189deed4c300465aec95734ba20ace6933a5
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Jul 11 18:52:05 2022 +0800

    [INLONG-4894][Sort] Add TubeMQ extract node (#4895)
---
 .../sort/protocol/constant/TubeMQConstant.java     |  47 ++++++++
 .../inlong/sort/protocol/node/ExtractNode.java     |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   2 +
 .../protocol/node/extract/TubeMQExtractNode.java   | 117 ++++++++++++++++++++
 .../node/extract/TubeMQExtractNodeTest.java        |  47 ++++++++
 .../inlong/sort/parser/TubeMQNodeSqlParseTest.java | 120 +++++++++++++++++++++
 6 files changed, 336 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
new file mode 100644
index 000000000..d8b2b7f0a
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * TubeMQ option constant.
+ */
+public class TubeMQConstant {
+
+    public static final String TOPIC = "topic";
+
+    public static final String GROUP_ID = "group.id";
+
+    public static final String CONNECTOR = "connector";
+
+    public static final String TUBEMQ = "tubemq";
+
+    public static final String MASTER_RPC = "master.rpc";
+
+    public static final String FORMAT = "format";
+
+    public static final String SESSION_KEY = "session.key";
+
+    /**
+     * The tubemq consumers use this tid set to filter records reading from server.
+     */
+    public static final String TID = "tid";
+
+    public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode";
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index b17dbf535..3d663877b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
 import javax.annotation.Nullable;
@@ -54,7 +55,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = FileSystemExtractNode.class, name = "fileSystemExtract"),
         @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract"),
         @JsonSubTypes.Type(value = SqlServerExtractNode.class, name = "sqlserverExtract"),
-        @JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract")
+        @JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract"),
+        @JsonSubTypes.Type(value = TubeMQExtractNode.class, name = "tubeMQExtract")
 })
 @Data
 @NoArgsConstructor
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index d6d892b1f..d5de844fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
 import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
@@ -68,6 +69,7 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = PulsarExtractNode.class, name = "pulsarExtract"),
         @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract"),
         @JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract"),
+        @JsonSubTypes.Type(value = TubeMQExtractNode.class, name = "tubeMQExtract"),
         @JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
new file mode 100644
index 000000000..8bfa79280
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * TubeMQ extract node for extracting data from Tube.
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("tubeMQExtract")
+@Data
+public class TubeMQExtractNode extends ExtractNode implements Serializable {
+
+    private static final long serialVersionUID = -2544747886429528474L;
+
+    @Nonnull
+    @JsonProperty("masterRpc")
+    private String masterRpc;
+
+    @Nonnull
+    @JsonProperty("topic")
+    private String topic;
+
+    @Nonnull
+    @JsonProperty("format")
+    private String format;
+
+    @Nonnull
+    @JsonProperty("groupId")
+    private String groupId;
+
+    @JsonProperty("sessionKey")
+    private String sessionKey;
+
+    /**
+     * The tubemq consumers use this tid set to filter records reading from server.
+     */
+    @JsonProperty("tid")
+    private TreeSet<String> tid;
+    
+    @JsonCreator
+    public TubeMQExtractNode(
+            @JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("watermarkField") WatermarkField waterMarkField,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("masterRpc") String masterRpc,
+            @Nonnull @JsonProperty("topic") String topic,
+            @Nonnull @JsonProperty("format") String format,
+            @Nonnull @JsonProperty("groupId") String groupId,
+            @JsonProperty("sessionKey") String sessionKey,
+            @JsonProperty("tid") TreeSet<String> tid
+    ) {
+        super(id, name, fields, waterMarkField, properties);
+        this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null");
+        this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
+        this.format = Preconditions.checkNotNull(format, "Format is null");
+        this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
+        this.sessionKey = sessionKey;
+        this.tid = tid;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> map = super.tableOptions();
+        map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ);
+        map.put(TubeMQConstant.TOPIC, topic);
+        map.put(TubeMQConstant.MASTER_RPC, masterRpc);
+        map.put(TubeMQConstant.GROUP_ID, groupId);
+        map.put(TubeMQConstant.FORMAT, format);
+        map.put(TubeMQConstant.SESSION_KEY, sessionKey);
+        if (null != tid && !tid.isEmpty()) {
+            map.put(TubeMQConstant.TID, tid.toString());
+        }
+        return map;
+    }
+
+    @Override
+    public String genTableName() {
+        return String.format("table_%s", super.getId());
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
new file mode 100644
index 000000000..cad255bc4
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link TubeMQExtractNode} serialize
+ */
+public class TubeMQExtractNodeTest extends SerializeBaseTest<TubeMQExtractNode> {
+
+    @Override
+    public TubeMQExtractNode getTestObject() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()));
+
+        return new TubeMQExtractNode("1", "tubeMQ_input", fields, null, null,
+                "127.0.0.1:8715","inlong","json","test",null,null);
+
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
new file mode 100644
index 000000000..f795b4faf
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test for Tube{@link TubeMQExtractNode} SQL parser.
+ */
+public class TubeMQNodeSqlParseTest extends AbstractTestBase {
+
+    /**
+     * Build TubeMQ extract node.
+     */
+    private TubeMQExtractNode buildTubeMQExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()));
+
+        return new TubeMQExtractNode(id, "tubeMQ_input", fields, null, null,
+                "127.0.0.1:8715", "inlong", "json", "test", null, null);
+    }
+
+    /**
+     * Build Kafka load node.
+     */
+    private KafkaLoadNode buildKafkaNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
+                "workerJson", "localhost:9092",
+                new JsonFormat(), null,
+                null, null);
+    }
+
+    /**
+     * Build node relation.
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test extract data from TubeMQ and load data to Kafka.
+     */
+    @Test
+    public void testTubeMQToKafka() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node tubeMQExtractNode = buildTubeMQExtractNode("1");
+        Node kafkaLoadNode = buildKafkaNode("2");
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(tubeMQExtractNode, kafkaLoadNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(tubeMQExtractNode),
+                        Collections.singletonList(kafkaLoadNode))));
+        GroupInfo groupInf = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInf);
+        Assert.assertTrue(parser.parse().tryExecute());
+    }
+
+}