You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/21 06:52:53 UTC

[incubator-inlong] branch master updated: [INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b44c878 [INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)
77b44c878 is described below

commit 77b44c8785b7e1cde2c9bfde5cb7e574fc34bf98
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Apr 21 14:52:48 2022 +0800

    [INLONG-3800][Sort] Add GroupInfo and StreamInfo definition to support transform (#3862)
---
 .../org/apache/inlong/sort/protocol/GroupInfo.java |  53 ++++++
 .../apache/inlong/sort/protocol/StreamInfo.java    |  61 +++++++
 .../apache/inlong/sort/protocol/node/LoadNode.java |   1 +
 .../protocol/node/transform/TransformNode.java     |   3 +
 .../apache/inlong/sort/protocol/GroupInfoTest.java | 195 +++++++++++++++++++++
 .../inlong/sort/protocol/StreamInfoTest.java       | 187 ++++++++++++++++++++
 .../sort/protocol/node/KafkaLoadNodeTest.java      |   2 +-
 7 files changed, 501 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
new file mode 100644
index 000000000..fbfcba6ad
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The concept of Groupinfo is the same as that of inlong group,
+ * and it is the smallest unit of sort task execution.
+ */
+@Data
+public class GroupInfo implements Serializable {
+
+    private static final long serialVersionUID = 6034630524669634079L;
+
+    @JsonProperty("groupId")
+    private String groupId;
+    @JsonProperty("streams")
+    private List<StreamInfo> streams;
+
+    /**
+     * @param groupId Uniquely identifies of GroupInfo
+     * @param streams The StreamInfo list that GroupInfo contains
+     */
+    @JsonCreator
+    public GroupInfo(@JsonProperty("groupId") String groupId,
+            @JsonProperty("streams") List<StreamInfo> streams) {
+        this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
+        this.streams = Preconditions.checkNotNull(streams, "streams is null");
+        Preconditions.checkState(!streams.isEmpty(), "streams is empty");
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
new file mode 100644
index 000000000..f63cbb700
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The concept of StreamInfo is the same as that of inlong stream
+ * It belongs to a group, and a group can contain one or more stream
+ */
+@Data
+public class StreamInfo implements Serializable {
+
+    private static final long serialVersionUID = 82342770067926123L;
+
+    @JsonProperty("streamId")
+    private String streamId;
+    @JsonProperty("nodes")
+    private List<Node> nodes;
+    @JsonProperty("relations")
+    private List<NodeRelationShip> relations;
+
+    /**
+     * @param streamId Uniquely identifies of GroupInfo
+     * @param nodes The node list that StreamInfo contains
+     * @param relations The relation list that StreamInfo contains,
+     *         it represents the relationship between nodes of StreamInfo
+     */
+    @JsonCreator
+    public StreamInfo(@JsonProperty("streamId") String streamId, @JsonProperty("nodes") List<Node> nodes,
+            @JsonProperty("relations") List<NodeRelationShip> relations) {
+        this.streamId = Preconditions.checkNotNull(streamId, "streamId is null");
+        this.nodes = Preconditions.checkNotNull(nodes, "nodes is null");
+        Preconditions.checkState(!nodes.isEmpty(), "nodes is empty");
+        this.relations = Preconditions.checkNotNull(relations, "relations is null");
+        Preconditions.checkState(!relations.isEmpty(), "relations is empty");
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 18b7e669d..76e5799c5 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -61,6 +61,7 @@ public abstract class LoadNode implements Node {
     @JsonProperty("sinkParallelism")
     private Integer sinkParallelism;
     @JsonProperty("filters")
+    @JsonInclude(Include.NON_NULL)
     private List<FilterFunction> filters = new ArrayList<>();
     @Nullable
     @JsonInclude(Include.NON_NULL)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
index e0a8e39d2..53e3eaf18 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -55,6 +57,7 @@ public class TransformNode implements Node, Serializable {
     @JsonProperty("fieldRelationShips")
     private List<FieldRelationShip> fieldRelationShips;
     @JsonProperty("filters")
+    @JsonInclude(Include.NON_NULL)
     private List<FilterFunction> filters = new ArrayList<>();
 
     @JsonCreator
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
new file mode 100644
index 000000000..1a6805209
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * GroupInfo unit test class
+ */
+public class GroupInfoTest {
+
+    private MySqlExtractNode buildMySqlExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.MINUTE));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                wk, null, "id",
+                Collections.singletonList("table"), "localhost", "username", "username",
+                "test_database", 3306, 123, true, null);
+    }
+
+    private KafkaLoadNode buildKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+                "topic", "localhost:9092", "json",
+                1, null);
+    }
+
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test serialize for GroupInfo
+     *
+     * @throws JsonProcessingException The exception may throws when serialize the GroupInfo
+     */
+    @Test
+    public void testSerialize() throws JsonProcessingException {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        ObjectMapper objectMapper = new ObjectMapper();
+        String expected =
+                "{\"groupId\":\"1\",\"streams\":[{\"streamId\":\"1\","
+                        + "\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+                        + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+                        + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                        + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                        + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                        + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                        + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                        + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+                        + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                        + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"interval\":{\"type\":\"constant\","
+                        + "\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                        + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
+                        + "\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
+                        + "\"password\":\"username\",\"database\":\"test_database\",\"port\":3306,"
+                        + "\"serverId\":123,\"incrementalSnapshotEnabled\":true},{\"type\":\"kafkaLoad\","
+                        + "\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+                        + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                        + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                        + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                        + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                        + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                        + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+                        + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+                        + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+                        + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+                        + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+                        + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+                        + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+                        + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+                        + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                        + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                        + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                        + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+                        + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+                        + "\"outputs\":[\"2\"]}]}]}";
+        assertEquals(expected, objectMapper.writeValueAsString(groupInfo));
+    }
+
+    /**
+     * Test deserialize for GroupInfo
+     *
+     * @throws JsonProcessingException The exception may throws when deserialize the GroupInfo
+     */
+    @Test
+    public void testDeserialize() throws JsonProcessingException {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        ObjectMapper objectMapper = new ObjectMapper();
+        String groupInfoStr = "{\"groupId\":\"1\","
+                + "\"streams\":[{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+                + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+                + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+                + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+                + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+                + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+                + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+                + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+                + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+                + "\"outputs\":[\"2\"]}]}]}";
+        GroupInfo expected = objectMapper.readValue(groupInfoStr, GroupInfo.class);
+        assertEquals(expected, groupInfo);
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
new file mode 100644
index 000000000..4c65eda6e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * StreamInfo unit test class
+ */
+public class StreamInfoTest {
+
+    private MySqlExtractNode buildMySqlExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.MINUTE));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                wk, null, "id",
+                Collections.singletonList("table"), "localhost", "username", "username",
+                "test_database", 3306, 123, true, null);
+    }
+
+    private KafkaLoadNode buildKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+                "topic", "localhost:9092", "json",
+                1, null);
+    }
+
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test serialize for StreamInfo
+     *
+     * @throws JsonProcessingException The exception may throws when serialize the StreamInfo
+     */
+    @Test
+    public void testSerialize() throws JsonProcessingException {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        ObjectMapper objectMapper = new ObjectMapper();
+        String expected = "{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+                + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+                + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+                + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+                + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+                + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+                + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+                + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+                + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+                + "\"outputs\":[\"2\"]}]}";
+        assertEquals(expected, objectMapper.writeValueAsString(streamInfo));
+    }
+
+    /**
+     * Test deserialize for StreamInfo
+     *
+     * @throws JsonProcessingException The exception may throws when deserialize the StreamInfo
+     */
+    @Test
+    public void testDeserialize() throws JsonProcessingException {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        ObjectMapper objectMapper = new ObjectMapper();
+        String streamInfoStr = "{\"streamId\":\"1\",\"nodes\":[{\"type\":\"mysqlExtract\",\"id\":\"1\","
+                + "\"name\":\"mysql_input\",\"fields\":[{\"type\":\"base\",\"name\":\"id\","
+                + "\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+                + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+                + "\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
+                + "\"database\":\"test_database\",\"port\":3306,\"serverId\":123,\"incrementalSnapshotEnabled\":true},"
+                + "{\"type\":\"kafkaLoad\",\"id\":\"2\",\"name\":\"kafka_output\",\"fields\":[{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},{\"type\":\"base\",\"name\":\"name\","
+                + "\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+                + "\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + "\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+                + "\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}},"
+                + "\"outputField\":{\"type\":\"base\",\"name\":\"name\",\"formatInfo\":{\"type\":\"string\"}}},"
+                + "{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+                + "\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+                + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
+                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
+                + "\"outputs\":[\"2\"]}]}";
+        StreamInfo expected = objectMapper.readValue(streamInfoStr, StreamInfo.class);
+        assertEquals(expected, streamInfo);
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
index dcd5975c2..b11956474 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
@@ -43,7 +43,7 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
                 + "\"formatInfo\":{\"type\":\"string\"}}],\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\","
                 + "\"inputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}},"
                 + "\"outputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}}}],"
-                + "\"filters\":null,\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
                 + "\"format\":\"json\",\"sinkParallelism\":1,\"properties\":{}}";
     }
 }