You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/21 06:52:53 UTC
[incubator-inlong] branch master updated: [INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 77b44c878 [INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)
77b44c878 is described below
commit 77b44c8785b7e1cde2c9bfde5cb7e574fc34bf98
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Apr 21 14:52:48 2022 +0800
[INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)
---
.../org/apache/inlong/sort/protocol/GroupInfo.java | 53 ++++++
.../apache/inlong/sort/protocol/StreamInfo.java | 61 +++++++
.../apache/inlong/sort/protocol/node/LoadNode.java | 1 +
.../protocol/node/transform/TransformNode.java | 3 +
.../apache/inlong/sort/protocol/GroupInfoTest.java | 195 +++++++++++++++++++++
.../inlong/sort/protocol/StreamInfoTest.java | 187 ++++++++++++++++++++
.../sort/protocol/node/KafkaLoadNodeTest.java | 2 +-
7 files changed, 501 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
new file mode 100644
index 000000000..fbfcba6ad
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The concept of Groupinfo is the same as that of inlong group,
+ * and it is the smallest unit of sort task execution.
+ */
+@Data
+public class GroupInfo implements Serializable {
+
+ private static final long serialVersionUID = 6034630524669634079L;
+
+ @JsonProperty("groupId")
+ private String groupId;
+ @JsonProperty("streams")
+ private List<StreamInfo> streams;
+
+ /**
+ * @param groupId Uniquely identifies of GroupInfo
+ * @param streams The StreamInfo list that GroupInfo contains
+ */
+ @JsonCreator
+ public GroupInfo(@JsonProperty("groupId") String groupId,
+ @JsonProperty("streams") List<StreamInfo> streams) {
+ this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
+ this.streams = Preconditions.checkNotNull(streams, "streams is null");
+ Preconditions.checkState(!streams.isEmpty(), "streams is empty");
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
new file mode 100644
index 000000000..f63cbb700
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The concept of StreamInfo is the same as that of inlong stream
+ * It belongs to a group, and a group can contain one or more stream
+ */
+@Data
+public class StreamInfo implements Serializable {
+
+ private static final long serialVersionUID = 82342770067926123L;
+
+ @JsonProperty("streamId")
+ private String streamId;
+ @JsonProperty("nodes")
+ private List<Node> nodes;
+ @JsonProperty("relations")
+ private List<NodeRelationShip> relations;
+
+ /**
+ * @param streamId Uniquely identifies of GroupInfo
+ * @param nodes The node list that StreamInfo contains
+ * @param relations The relation list that StreamInfo contains,
+ * it represents the relationship between nodes of StreamInfo
+ */
+ @JsonCreator
+ public StreamInfo(@JsonProperty("streamId") String streamId, @JsonProperty("nodes") List<Node> nodes,
+ @JsonProperty("relations") List<NodeRelationShip> relations) {
+ this.streamId = Preconditions.checkNotNull(streamId, "streamId is null");
+ this.nodes = Preconditions.checkNotNull(nodes, "nodes is null");
+ Preconditions.checkState(!nodes.isEmpty(), "nodes is empty");
+ this.relations = Preconditions.checkNotNull(relations, "relations is null");
+ Preconditions.checkState(!relations.isEmpty(), "relations is empty");
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 18b7e669d..76e5799c5 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -61,6 +61,7 @@ public abstract class LoadNode implements Node {
@JsonProperty("sinkParallelism")
private Integer sinkParallelism;
@JsonProperty("filters")
+ @JsonInclude(Include.NON_NULL)
private List<FilterFunction> filters = new ArrayList<>();
@Nullable
@JsonInclude(Include.NON_NULL)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
index e0a8e39d2..53e3eaf18 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -55,6 +57,7 @@ public class TransformNode implements Node, Serializable {
@JsonProperty("fieldRelationShips")
private List<FieldRelationShip> fieldRelationShips;
@JsonProperty("filters")
+ @JsonInclude(Include.NON_NULL)
private List<FilterFunction> filters = new ArrayList<>();
@JsonCreator
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
new file mode 100644
index 000000000..1a6805209
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * GroupInfo unit test class
+ */
+public class GroupInfoTest {
+
+ private MySqlExtractNode buildMySqlExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.MINUTE));
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ wk, null, "id",
+ Collections.singletonList("table"), "localhost", "username", "username",
+ "test_database", 3306, 123, true, null);
+ }
+
+ private KafkaLoadNode buildKafkaNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()))
+ );
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ "topic", "localhost:9092", "json",
+ 1, null);
+ }
+
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test serialize for GroupInfo
+ *
+ * @throws JsonProcessingException The exception may throws when serialize the GroupInfo
+ */
+ @Test
+ public void testSerialize() throws JsonProcessingException {
+ Node input = buildMySqlExtractNode();
+ Node output = buildKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+ buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ ObjectMapper objectMapper = new ObjectMapper();
+ String expected =
+ "{\"groupId\":\"1\",\"streams\":[{\"streamId\":\"1\","
+ + "\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+ + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+ + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+ + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"interval\":{\"type\":\"constant\","
+ + "\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
+ + "\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
+ + "\"password\":\"username\",\"database\":\"test_database\",\"port\":3306,"
+ + "\"serverId\":123,\"incrementalSnapshotEnabled\":true},{\"type\":\"kafkaLoad\","
+ + "\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+ + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+ + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+ + "\"outputs\":[\"2\"]}]}]}";
+ assertEquals(expected, objectMapper.writeValueAsString(groupInfo));
+ }
+
+ /**
+ * Test deserialize for GroupInfo
+ *
+ * @throws JsonProcessingException The exception may throws when deserialize the GroupInfo
+ */
+ @Test
+ public void testDeserialize() throws JsonProcessingException {
+ Node input = buildMySqlExtractNode();
+ Node output = buildKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+ buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ ObjectMapper objectMapper = new ObjectMapper();
+ String groupInfoStr = "{\"groupId\":\"1\","
+ + "\"streams\":[{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+ + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+ + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+ + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+ + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+ + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+ + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+ + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+ + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+ + "\"outputs\":[\"2\"]}]}]}";
+ GroupInfo expected = objectMapper.readValue(groupInfoStr, GroupInfo.class);
+ assertEquals(expected, groupInfo);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
new file mode 100644
index 000000000..4c65eda6e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * StreamInfo unit test class
+ */
+public class StreamInfoTest {
+
+ private MySqlExtractNode buildMySqlExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.MINUTE));
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ wk, null, "id",
+ Collections.singletonList("table"), "localhost", "username", "username",
+ "test_database", 3306, 123, true, null);
+ }
+
+ private KafkaLoadNode buildKafkaNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()))
+ );
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ "topic", "localhost:9092", "json",
+ 1, null);
+ }
+
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test serialize for StreamInfo
+ *
+ * @throws JsonProcessingException The exception may throws when serialize the StreamInfo
+ */
+ @Test
+ public void testSerialize() throws JsonProcessingException {
+ Node input = buildMySqlExtractNode();
+ Node output = buildKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+ buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+ ObjectMapper objectMapper = new ObjectMapper();
+ String expected = "{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+ + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+ + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+ + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+ + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+ + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+ + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+ + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+ + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+ + "\"outputs\":[\"2\"]}]}";
+ assertEquals(expected, objectMapper.writeValueAsString(streamInfo));
+ }
+
+ /**
+ * Test deserialize for StreamInfo
+ *
+ * @throws JsonProcessingException The exception may throws when deserialize the StreamInfo
+ */
+ @Test
+ public void testDeserialize() throws JsonProcessingException {
+ Node input = buildMySqlExtractNode();
+ Node output = buildKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+ buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+ ObjectMapper objectMapper = new ObjectMapper();
+ String streamInfoStr = "{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+ + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+ + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+ + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+ + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+ + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+ + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+ + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+ + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+ + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+ + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+ + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+ + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+ + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+ + "\"outputs\":[\"2\"]}]}";
+ StreamInfo expected = objectMapper.readValue(streamInfoStr, StreamInfo.class);
+ assertEquals(expected, streamInfo);
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
index dcd5975c2..b11956474 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
@@ -43,7 +43,7 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
+ "\"formatInfo\":{\"type\":\"string\"}}],\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\","
+ "\"inputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}},"
+ "\"outputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}}}],"
- + "\"filters\":null,\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+ "\"format\":\"json\",\"sinkParallelism\":1,\"properties\":{}}";
}
}