You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/11 10:52:10 UTC
[inlong] branch master updated: [INLONG-4894][Sort] Add TubeMQ extract node (#4895)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2c7e189de [INLONG-4894][Sort] Add TubeMQ extract node (#4895)
2c7e189de is described below
commit 2c7e189deed4c300465aec95734ba20ace6933a5
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Jul 11 18:52:05 2022 +0800
[INLONG-4894][Sort] Add TubeMQ extract node (#4895)
---
.../sort/protocol/constant/TubeMQConstant.java | 47 ++++++++
.../inlong/sort/protocol/node/ExtractNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 2 +
.../protocol/node/extract/TubeMQExtractNode.java | 117 ++++++++++++++++++++
.../node/extract/TubeMQExtractNodeTest.java | 47 ++++++++
.../inlong/sort/parser/TubeMQNodeSqlParseTest.java | 120 +++++++++++++++++++++
6 files changed, 336 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
new file mode 100644
index 000000000..d8b2b7f0a
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * TubeMQ option constant.
+ */
+public class TubeMQConstant {
+
+ public static final String TOPIC = "topic";
+
+ public static final String GROUP_ID = "group.id";
+
+ public static final String CONNECTOR = "connector";
+
+ public static final String TUBEMQ = "tubemq";
+
+ public static final String MASTER_RPC = "master.rpc";
+
+ public static final String FORMAT = "format";
+
+ public static final String SESSION_KEY = "session.key";
+
+ /**
+ * The tubemq consumers use this tid set to filter records reading from server.
+ */
+ public static final String TID = "tid";
+
+ public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode";
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index b17dbf535..3d663877b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
import javax.annotation.Nullable;
@@ -54,7 +55,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = FileSystemExtractNode.class, name = "fileSystemExtract"),
@JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract"),
@JsonSubTypes.Type(value = SqlServerExtractNode.class, name = "sqlserverExtract"),
- @JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract")
+ @JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract"),
+ @JsonSubTypes.Type(value = TubeMQExtractNode.class, name = "tubeMQExtract")
})
@Data
@NoArgsConstructor
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index d6d892b1f..d5de844fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
@@ -68,6 +69,7 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = PulsarExtractNode.class, name = "pulsarExtract"),
@JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract"),
@JsonSubTypes.Type(value = OracleExtractNode.class, name = "oracleExtract"),
+ @JsonSubTypes.Type(value = TubeMQExtractNode.class, name = "tubeMQExtract"),
@JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
new file mode 100644
index 000000000..8bfa79280
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * TubeMQ extract node for extracting data from Tube.
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("tubeMQExtract")
+@Data
+public class TubeMQExtractNode extends ExtractNode implements Serializable {
+
+ private static final long serialVersionUID = -2544747886429528474L;
+
+ @Nonnull
+ @JsonProperty("masterRpc")
+ private String masterRpc;
+
+ @Nonnull
+ @JsonProperty("topic")
+ private String topic;
+
+ @Nonnull
+ @JsonProperty("format")
+ private String format;
+
+ @Nonnull
+ @JsonProperty("groupId")
+ private String groupId;
+
+ @JsonProperty("sessionKey")
+ private String sessionKey;
+
+ /**
+ * The tubemq consumers use this tid set to filter records reading from server.
+ */
+ @JsonProperty("tid")
+ private TreeSet<String> tid;
+
+ @JsonCreator
+ public TubeMQExtractNode(
+ @JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermarkField") WatermarkField waterMarkField,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("masterRpc") String masterRpc,
+ @Nonnull @JsonProperty("topic") String topic,
+ @Nonnull @JsonProperty("format") String format,
+ @Nonnull @JsonProperty("groupId") String groupId,
+ @JsonProperty("sessionKey") String sessionKey,
+ @JsonProperty("tid") TreeSet<String> tid
+ ) {
+ super(id, name, fields, waterMarkField, properties);
+ this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null");
+ this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
+ this.format = Preconditions.checkNotNull(format, "Format is null");
+ this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
+ this.sessionKey = sessionKey;
+ this.tid = tid;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> map = super.tableOptions();
+ map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ);
+ map.put(TubeMQConstant.TOPIC, topic);
+ map.put(TubeMQConstant.MASTER_RPC, masterRpc);
+ map.put(TubeMQConstant.GROUP_ID, groupId);
+ map.put(TubeMQConstant.FORMAT, format);
+ map.put(TubeMQConstant.SESSION_KEY, sessionKey);
+ if (null != tid && !tid.isEmpty()) {
+ map.put(TubeMQConstant.TID, tid.toString());
+ }
+ return map;
+ }
+
+ @Override
+ public String genTableName() {
+ return String.format("table_%s", super.getId());
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
new file mode 100644
index 000000000..cad255bc4
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link TubeMQExtractNode} serialize
+ */
+public class TubeMQExtractNodeTest extends SerializeBaseTest<TubeMQExtractNode> {
+
+ @Override
+ public TubeMQExtractNode getTestObject() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()));
+
+ return new TubeMQExtractNode("1", "tubeMQ_input", fields, null, null,
+ "127.0.0.1:8715","inlong","json","test",null,null);
+
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
new file mode 100644
index 000000000..f795b4faf
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test for Tube{@link TubeMQExtractNode} SQL parser.
+ */
+public class TubeMQNodeSqlParseTest extends AbstractTestBase {
+
+ /**
+ * Build TubeMQ extract node.
+ */
+ private TubeMQExtractNode buildTubeMQExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()));
+
+ return new TubeMQExtractNode(id, "tubeMQ_input", fields, null, null,
+ "127.0.0.1:8715", "inlong", "json", "test", null, null);
+ }
+
+ /**
+ * Build Kafka load node.
+ */
+ private KafkaLoadNode buildKafkaNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()))
+ );
+ return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
+ "workerJson", "localhost:9092",
+ new JsonFormat(), null,
+ null, null);
+ }
+
+ /**
+ * Build node relation.
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test extract data from TubeMQ and load data to Kafka.
+ */
+ @Test
+ public void testTubeMQToKafka() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node tubeMQExtractNode = buildTubeMQExtractNode("1");
+ Node kafkaLoadNode = buildKafkaNode("2");
+ StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(tubeMQExtractNode, kafkaLoadNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(tubeMQExtractNode),
+ Collections.singletonList(kafkaLoadNode))));
+ GroupInfo groupInf = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInf);
+ Assert.assertTrue(parser.parse().tryExecute());
+ }
+
+}