You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/20 12:21:01 UTC

[incubator-inlong] branch master updated: [INLONG-3719][Manager] Support data_transformation feature in Inlong (#3774)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 39e7bc99d [INLONG-3719][Manager] Support data_transformation feature in Inlong  (#3774)
39e7bc99d is described below

commit 39e7bc99d0a3ef7d377e833c8401828f0e5762c9
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Apr 20 20:20:56 2022 +0800

    [INLONG-3719][Manager] Support data_transformation feature in Inlong  (#3774)
    
    * Add constructor to streamTransform
    
    * Add unit tests for stream transform
    
    * Update API doc
    
    * Add processing_time for stream
    
    * resolve circular dependency in manager service
    
    * Update SQL file format
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../manager/client/AutoPush2HiveExample.java       |   4 +-
 .../inlong/manager/client/File2HiveExample.java    |   4 +-
 .../inlong/manager/client/Kafka2HiveExample.java   |   4 +-
 .../inlong/manager/client/cli/CreateGroupConf.java |   2 +-
 inlong-manager/manager-client/pom.xml              |   6 +
 .../inlong/manager/client/api/InlongGroup.java     |  10 +
 .../inlong/manager/client/api/InlongGroupConf.java |   5 +-
 .../inlong/manager/client/api/InlongStream.java    |  11 +
 .../manager/client/api/InlongStreamBuilder.java    |   9 +
 .../manager/client/api/InlongStreamConf.java       |   1 +
 .../inlong/manager/client/api/SinkField.java       |   1 +
 .../inlong/manager/client/api/StreamSink.java      |   3 +-
 .../inlong/manager/client/api/StreamSource.java    |   3 +-
 .../api/{StreamSink.java => StreamTransform.java}  |  25 +--
 .../manager/client/api/impl/BlankInlongGroup.java  |   5 +
 .../api/impl/DefaultInlongStreamBuilder.java       |  85 +++++--
 .../manager/client/api/impl/InlongGroupImpl.java   |   5 +
 .../manager/client/api/impl/InlongStreamImpl.java  | 101 ++++++++-
 .../client/api/inner/InnerInlongManagerClient.java | 123 ++++++++++-
 .../client/api/inner/InnerStreamContext.java       |   6 +
 .../api/transform/MultiDependencyTransform.java    |  68 ++++++
 .../api/transform/SingleDependencyTransform.java   |  65 ++++++
 .../inlong/manager/client/api/util/AssertUtil.java |   6 +
 .../manager/client/api/util/InlongParser.java      |   9 +
 .../client/api/util/InlongStreamSinkTransfer.java  |   2 +-
 .../api/util/InlongStreamSourceTransfer.java       |  87 +++++---
 .../client/api/util/InlongStreamTransfer.java      |   2 +-
 .../api/util/InlongStreamTransformTransfer.java    |  85 +++++++
 .../client/api/impl/InlongStreamImplTest.java      |  86 ++++++++
 .../api/impl/InnerInlongManagerClientTest.java     |   1 -
 .../inlong/manager/common/enums/Constant.java      |  31 ---
 .../enums/DBCollectorDetailTaskEntityStatus.java   |   1 +
 .../common/enums/DBCollectorTaskConstant.java      |   1 +
 .../common/enums/DBCollectorTaskReturnCode.java    |   1 +
 .../inlong/manager/common/enums/ErrorCodeEnum.java |  17 +-
 .../apache/inlong/manager/common/enums/MQType.java |   2 +-
 .../inlong/manager/common/enums/SinkType.java      |   3 -
 .../inlong/manager/common/enums/SourceType.java    |   3 -
 .../enums/{MQType.java => TransformType.java}      |  45 +++-
 .../manager/common/pojo/source/SourceRequest.java  |   5 +
 .../manager/common/pojo/source/SourceResponse.java |   5 +
 .../common/pojo/stream/InlongStreamRequest.java    |   3 +
 .../manager/common/pojo/stream}/StreamField.java   |  11 +-
 .../manager/common/pojo/stream/StreamNode.java}    |  37 ++--
 .../common/pojo/stream/StreamNodeRelationship.java |  51 +++++
 .../manager/common/pojo/stream/StreamPipeline.java | 106 +++++++++
 .../pojo/transform/TransformDefinition.java}       |  31 ++-
 .../TransformRequest.java}                         |  46 ++--
 .../common/pojo/transform/TransformResponse.java   |  56 +++++
 .../deduplication/DeDuplicationDefinition.java     |  83 +++++++
 .../pojo/transform/filter/FilterDefinition.java    | 113 ++++++++++
 .../pojo/transform/joiner/JoinerDefinition.java    |  79 +++++++
 .../replacer/StringReplacerDefinition.java         |  65 ++++++
 .../transform/splitter/SplitterDefinition.java     |  70 ++++++
 .../common/settings/InlongGroupSettings.java       |  77 +++----
 .../common/util/TransformDefinitionUtils.java      |  51 +++++
 .../common/pojo/stream/StreamPipelineTest.java     |  41 ++++
 .../pojo/transform/TransformDefinitionTest.java    |  99 +++++++++
 .../dao/entity/StreamSourceFieldEntity.java}       |  46 ++--
 .../manager/dao/entity/StreamTransformEntity.java} |  45 ++--
 .../dao/mapper/CommonDbServerEntityMapper.java     |   1 +
 .../dao/mapper/CommonFileServerEntityMapper.java   |   1 +
 .../dao/mapper/DBCollectorDetailTaskMapper.java    |   1 +
 ...per.java => StreamSourceFieldEntityMapper.java} |  41 ++--
 ...apper.java => StreamTransformEntityMapper.java} |  22 +-
 .../src/main/resources/generatorConfig.xml         |   6 +
 .../mappers/StreamSourceFieldEntityMapper.xml      | 245 +++++++++++++++++++++
 .../mappers/StreamTransformEntityMapper.xml        | 234 ++++++++++++++++++++
 .../manager/service/CommonOperateService.java      | 100 +--------
 .../core/impl/DBCollectorTaskServiceImpl.java      |   1 +
 .../service/core/impl/InlongGroupServiceImpl.java  |  21 +-
 .../service/core/impl/InlongStreamServiceImpl.java |  30 ++-
 .../core/impl/StreamConfigLogServiceImpl.java      |  13 +-
 .../core/impl/ThirdPartyClusterServiceImpl.java    |   9 +-
 .../service/sink/StreamSinkServiceImpl.java        | 122 +++++-----
 .../service/sink/ck/ClickHouseSinkOperation.java   |   6 +-
 .../service/sink/hive/HiveSinkOperation.java       |   6 +-
 .../service/sink/iceberg/IcebergSinkOperation.java |   6 +-
 .../service/sink/kafka/KafkaSinkOperation.java     |   6 +-
 .../service/source/AbstractSourceOperation.java    |  68 +++++-
 .../service/source/StreamSourceServiceImpl.java    |  27 +--
 .../source/autopush/AutoPushSourceOperation.java   |   2 +-
 .../source/binlog/BinlogSourceOperation.java       |   2 +-
 .../service/source/file/FileSourceOperation.java   |   2 +-
 .../service/source/kafka/KafkaSourceOperation.java |   2 +-
 .../thirdparty/mq/CreateTubeGroupTaskListener.java |   4 +-
 .../service/thirdparty/mq/TubeMqOptService.java    |  10 +-
 .../thirdparty/sort/CreateSortConfigListener.java  |   8 +-
 .../thirdparty/sort/PushSortConfigListener.java    |   8 +-
 .../thirdparty/sort/util/DataFlowUtils.java        | 113 ++++++++++
 .../service/transform/StreamTransformService.java  |  65 ++++++
 .../transform/StreamTransformServiceImpl.java      | 153 +++++++++++++
 .../inlong/manager/service/ServiceBaseTest.java    |   6 +-
 .../service/core/impl/AgentServiceTest.java        |  10 +-
 .../core/impl/InlongClusterServiceTest.java        |  14 +-
 .../transform/StreamTransformServiceTest.java      |  73 ++++++
 .../src/main/resources/application-test.properties |   1 +
 .../main/resources/sql/apache_inlong_manager.sql   |  48 ++++
 .../manager-web/sql/apache_inlong_manager.sql      |  49 +++++
 .../web/controller/StreamTransformController.java  |  84 +++++++
 100 files changed, 3124 insertions(+), 544 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
index e0857e5c0..72567bd8d 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.client;
 
-import org.apache.commons.compress.utils.Lists;
+import com.google.common.collect.Lists;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.PulsarBaseConf;
 import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.source.AutoPushSource;
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
index 9f3d6b9f8..af9a79968 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.client;
 
-import org.apache.commons.compress.utils.Lists;
+import com.google.common.collect.Lists;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.PulsarBaseConf;
 import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.source.AgentFileSource;
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
index 4bd4b3573..71f359ee3 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.client;
 
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.compress.utils.Lists;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.PulsarBaseConf;
 import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
index 21bb90d30..dcab288c8 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli;
 import lombok.Data;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.StreamSource;
 
diff --git a/inlong-manager/manager-client/pom.xml b/inlong-manager/manager-client/pom.xml
index 9985f26e3..0b893d37e 100644
--- a/inlong-manager/manager-client/pom.xml
+++ b/inlong-manager/manager-client/pom.xml
@@ -32,6 +32,12 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>manager-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 94f94c77f..c299e4867 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -53,12 +53,22 @@ public interface InlongGroup {
      */
     void update(InlongGroupConf conf) throws Exception;
 
+    /**
+     * ReInit inlong group after update configuration for group.
+     * Must be invoked when group is rejected,failed or started
+     *
+     * @return inlong group info
+     */
+    InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;
+
     /**
      * Init inlong group on updated conf.
      * Must be invoked when group is rejected,failed or started
+     * This method is deprecated, recommend to use reInitOnUpdate
      *
      * @return inlong group info
      */
+    @Deprecated
     InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;
 
     /**
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index bef4dccac..e6631a0f9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -59,6 +59,9 @@ public class InlongGroupConf {
     @ApiModelProperty("Need zookeeper support")
     private boolean zookeeperEnabled = true;
 
-    @ApiModelProperty("data proxy cluster id")
+    @ApiModelProperty("Data proxy cluster id")
     private Integer proxyClusterId;
+
+    @ApiModelProperty("Use light weight group")
+    private boolean lightWeight = false;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index ab9c17da8..cf18ac91a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.manager.client.api;
 
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+
 import java.util.List;
 import java.util.Map;
 
@@ -30,11 +33,19 @@ public abstract class InlongStream {
 
     public abstract Map<String, StreamSink> getSinks();
 
+    public abstract Map<String, StreamTransform> getTransforms();
+
     public abstract void addSource(StreamSource source);
 
     public abstract void addSink(StreamSink sink);
 
+    public abstract void addTransform(StreamTransform transform);
+
+    public abstract StreamPipeline createPipeline();
+
+    @Deprecated
     public abstract void updateSource(StreamSource source);
 
+    @Deprecated
     public abstract void updateSink(StreamSink sink);
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
index 9972c2979..f7ee29f68 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.client.api;
 
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+
 import java.util.List;
 
 public abstract class InlongStreamBuilder {
@@ -43,6 +45,13 @@ public abstract class InlongStreamBuilder {
      */
     public abstract InlongStreamBuilder fields(List<StreamField> fieldList);
 
+    /**
+     * Create stream transform
+     *
+     * @return inlong stream builder
+     */
+    public abstract InlongStreamBuilder transform(StreamTransform streamTransform);
+
     /**
      * Create data stream by builder
      *
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
index 2a4a3e9c4..8d7065c2f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index acb8c7130..70ad16e03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 0c18cd558..d5762e477 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -21,13 +21,14 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
 
 import java.util.List;
 import java.util.Map;
 
 @Data
 @ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public abstract class StreamSink extends StreamNode {
 
     @ApiModelProperty(value = "DataSink name", required = true)
     private String sinkName;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index dca118310..754cd7bb9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -22,10 +22,11 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
 
 @Data
 @ApiModel("Stream source configuration")
-public abstract class StreamSource {
+public abstract class StreamSource extends StreamNode {
 
     public enum State {
         INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
similarity index 65%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
index 0c18cd558..4c41ad813 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
@@ -20,25 +20,16 @@ package org.apache.inlong.manager.client.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
 
 @Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
-
-    @ApiModelProperty(value = "DataSink name", required = true)
-    private String sinkName;
-
-    @ApiModelProperty("Other properties if need")
-    private Map<String, Object> properties;
-
-    public abstract SinkType getSinkType();
-
-    public abstract List<SinkField> getSinkFields();
+@ApiModel("Stream Transform configuration")
+public abstract class StreamTransform extends StreamNode {
 
-    public abstract DataFormat getDataFormat();
+    @ApiModelProperty(value = "Transform name", required = true)
+    protected String transformName;
 
+    @ApiModelProperty(value = "Transform name", required = true)
+    protected TransformDefinition transformDefinition;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index d94521f4e..633ffd514 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -48,6 +48,11 @@ public class BlankInlongGroup implements InlongGroup {
         throw new UnsupportedOperationException("Inlong group is not exists");
     }
 
+    @Override
+    public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
     @Override
     public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
         throw new UnsupportedOperationException("Inlong group is not exists");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index cd248ffee..92f7772eb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -25,9 +25,9 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamTransform;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
@@ -43,17 +44,18 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 
 import java.util.List;
+import java.util.Map;
 
 public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     private InlongStreamImpl inlongStream;
 
-    private InlongStreamConf streamConf;
-
-    private InnerGroupContext groupContext;
-
     private InnerStreamContext streamContext;
 
     private InnerInlongManagerClient managerClient;
@@ -62,8 +64,6 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
             InlongStreamConf streamConf,
             InnerGroupContext groupContext,
             InnerInlongManagerClient managerClient) {
-        this.streamConf = streamConf;
-        this.groupContext = groupContext;
         this.managerClient = managerClient;
         if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
             groupContext.setStreamContextMap(Maps.newHashMap());
@@ -105,9 +105,20 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         return this;
     }
 
+    @Override
+    public InlongStreamBuilder transform(StreamTransform streamTransform) {
+        inlongStream.addTransform(streamTransform);
+        TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(streamTransform,
+                streamContext.getStreamInfo());
+        streamContext.setTransformRequest(transformRequest);
+        return this;
+    }
+
     @Override
     public InlongStream init() {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+        StreamPipeline streamPipeline = inlongStream.createPipeline();
+        streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
         String streamIndex = managerClient.createStreamInfo(streamInfo);
         streamInfo.setId(Double.valueOf(streamIndex).intValue());
         //Create source and update index
@@ -122,24 +133,27 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
             String sinkIndex = managerClient.createSink(sinkRequest);
             sinkRequest.setId(Double.valueOf(sinkIndex).intValue());
         }
+        //Create transform and update index
+        List<TransformRequest> transformRequests = Lists.newArrayList(streamContext.getTransformRequests().values());
+        for (TransformRequest transformRequest : transformRequests) {
+            String transformIndex = managerClient.createTransform(transformRequest);
+            transformRequest.setId(Double.valueOf(transformIndex).intValue());
+        }
         return inlongStream;
     }
 
     @Override
     public InlongStream initOrUpdate() {
         InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
+        StreamPipeline streamPipeline = inlongStream.createPipeline();
+        dataStreamInfo.setTempView(GsonUtil.toJson(streamPipeline));
         Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
         if (existMsg.getKey()) {
             Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
             if (updateMsg.getKey()) {
-                List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
-                for (SourceRequest sourceRequest : sourceRequests) {
-                    sourceRequest.setId(initOrUpdateSource(sourceRequest));
-                }
-                List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
-                for (SinkRequest sinkRequest : sinkRequests) {
-                    sinkRequest.setId(initOrUpdateSink(sinkRequest));
-                }
+                initOrUpdateTransform();
+                initOrUpdateSource();
+                initOrUpdateSink();
             } else {
                 throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
             }
@@ -149,6 +163,40 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         }
     }
 
+    private void initOrUpdateTransform() {
+        Map<String, TransformRequest> transformRequests = streamContext.getTransformRequests();
+        InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+        for (TransformResponse transformResponse : transformResponses) {
+            StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
+            String transformName = transform.getTransformName();
+            if (transformRequests.get(transformName) == null) {
+                TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+                        streamInfo);
+                boolean isDelete = managerClient.deleteTransform(transformRequest);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+                }
+            } else {
+                TransformRequest transformRequest = transformRequests.get(transformName);
+                Pair<Boolean, String> updateState = managerClient.updateTransform(transformRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
+                            updateState.getValue()));
+                }
+            }
+        }
+    }
+
+    private void initOrUpdateSource() {
+        List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
+        for (SourceRequest sourceRequest : sourceRequests) {
+            sourceRequest.setId(initOrUpdateSource(sourceRequest));
+        }
+    }
+
     private int initOrUpdateSource(SourceRequest sourceRequest) {
         String sourceType = sourceRequest.getSourceType();
         if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
@@ -184,6 +232,13 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         }
     }
 
+    private void initOrUpdateSink() {
+        List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
+        for (SinkRequest sinkRequest : sinkRequests) {
+            sinkRequest.setId(initOrUpdateSink(sinkRequest));
+        }
+    }
+
     private int initOrUpdateSink(SinkRequest sinkRequest) {
         String sinkType = sinkRequest.getSinkType();
         boolean flag = SinkType.HIVE.name().equals(sinkType) || SinkType.KAFKA.name().equals(sinkType)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index b8bb047ce..797a48b35 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -139,6 +139,11 @@ public class InlongGroupImpl implements InlongGroup {
         AssertUtil.isNull(errMsg, errMsg);
     }
 
+    @Override
+    public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
+        return initOnUpdate(conf);
+    }
+
     @Override
     public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
         update(conf);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 3991d825f..b3be9ff97 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -17,16 +17,19 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamTransform;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
@@ -36,9 +39,13 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 @Data
@@ -52,6 +59,8 @@ public class InlongStreamImpl extends InlongStream {
 
     private Map<String, StreamSink> streamSinks = Maps.newHashMap();
 
+    private Map<String, StreamTransform> streamTransforms = Maps.newHashMap();
+
     private List<StreamField> streamFields = Lists.newArrayList();
 
     public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
@@ -116,6 +125,11 @@ public class InlongStreamImpl extends InlongStream {
         return this.streamSinks;
     }
 
+    @Override
+    public Map<String, StreamTransform> getTransforms() {
+        return this.streamTransforms;
+    }
+
     @Override
     public void addSource(StreamSource source) {
         AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
@@ -136,6 +150,89 @@ public class InlongStreamImpl extends InlongStream {
         streamSinks.put(sinkName, sink);
     }
 
+    @Override
+    public void addTransform(StreamTransform transform) {
+        AssertUtil.notNull(transform.getTransformName(), "Transform name should not be empty");
+        String transformName = transform.getTransformName();
+        if (streamTransforms.get(transformName) != null) {
+            throw new IllegalArgumentException(String.format("TransformName=%s has already be set", transform));
+        }
+        streamTransforms.put(transformName, transform);
+    }
+
+    @Override
+    public StreamPipeline createPipeline() {
+        StreamPipeline streamPipeline = new StreamPipeline();
+        if (MapUtils.isEmpty(streamTransforms)) {
+            StreamNodeRelationship relationship = new StreamNodeRelationship();
+            relationship.setInputNodes(streamSources.keySet());
+            relationship.setOutputNodes(streamSinks.keySet());
+            streamPipeline.setPipeline(Lists.newArrayList(relationship));
+            return streamPipeline;
+        }
+        Map<Set<String>, List<StreamNodeRelationship>> relationshipMap = Maps.newHashMap();
+        // Create StreamNodeRelationships
+        // Check preNodes
+        for (StreamTransform streamTransform : streamTransforms.values()) {
+            String transformName = streamTransform.getTransformName();
+            Set<String> preNodes = streamTransform.getPreNodes();
+            StreamNodeRelationship relationship = new StreamNodeRelationship();
+            relationship.setInputNodes(preNodes);
+            relationship.setOutputNodes(Sets.newHashSet(transformName));
+            for (String preNode : preNodes) {
+                StreamTransform transform = streamTransforms.get(preNode);
+                if (transform != null) {
+                    transform.addPost(transformName);
+                }
+            }
+            relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+        }
+        // Check postNodes
+        for (StreamTransform streamTransform : streamTransforms.values()) {
+            String transformName = streamTransform.getTransformName();
+            Set<String> postNodes = streamTransform.getPostNodes();
+            Set<String> sinkSet = Sets.newHashSet();
+            for (String postNode : postNodes) {
+                StreamSink sink = streamSinks.get(postNode);
+                if (sink != null) {
+                    sinkSet.add(sink.getSinkName());
+                }
+            }
+            if (CollectionUtils.isNotEmpty(sinkSet)) {
+                StreamNodeRelationship relationship = new StreamNodeRelationship();
+                Set<String> preNodes = Sets.newHashSet(transformName);
+                relationship.setInputNodes(preNodes);
+                relationship.setOutputNodes(sinkSet);
+                relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+            }
+        }
+        List<StreamNodeRelationship> relationships = Lists.newArrayList();
+        // Merge StreamNodeRelationship with same preNodes
+        for (Map.Entry<Set<String>, List<StreamNodeRelationship>> entry : relationshipMap.entrySet()) {
+            List<StreamNodeRelationship> unmergedRelationships = entry.getValue();
+            if (unmergedRelationships.size() == 1) {
+                relationships.add(unmergedRelationships.get(0));
+            } else {
+                StreamNodeRelationship mergedRelationship = unmergedRelationships.get(0);
+                for (int index = 1; index < unmergedRelationships.size(); index++) {
+                    StreamNodeRelationship unmergedRelationship = unmergedRelationships.get(index);
+                    unmergedRelationship.getOutputNodes().stream()
+                            .forEach(outputNode -> mergedRelationship.addOutputNode(outputNode));
+                }
+                relationships.add(mergedRelationship);
+            }
+        }
+        streamPipeline.setPipeline(relationships);
+        Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
+        if (circleState.getLeft()) {
+            Pair<String, String> circleNodes = circleState.getRight();
+            throw new IllegalStateException(
+                    String.format("There is circle dependency in streamPipeline for node=%s and node=%s",
+                            circleNodes.getLeft(), circleNodes.getRight()));
+        }
+        return streamPipeline;
+    }
+
     @Override
     public void updateSource(StreamSource source) {
         AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index b957b3d9f..e8a2a6dd1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -51,6 +51,8 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -63,13 +65,13 @@ import java.util.List;
 @Slf4j
 public class InnerInlongManagerClient {
 
-    private static final String HTTP_PATH = "api/inlong/manager";
+    protected static final String HTTP_PATH = "api/inlong/manager";
 
-    private final OkHttpClient httpClient;
-    private final String host;
-    private final int port;
-    private final String uname;
-    private final String passwd;
+    protected final OkHttpClient httpClient;
+    protected final String host;
+    protected final int port;
+    protected final String uname;
+    protected final String passwd;
 
     public InnerInlongManagerClient(ClientConfiguration configuration) {
         this.host = configuration.getBindHost();
@@ -479,6 +481,111 @@ public class InnerInlongManagerClient {
         }
     }
 
+    public String createTransform(TransformRequest transformRequest) {
+        String path = HTTP_PATH + "/transform/save";
+        final String sink = GsonUtil.toJson(transformRequest);
+        final RequestBody transformBody = RequestBody.create(MediaType.parse("application/json"), sink);
+        final String url = formatUrl(path);
+        Request request = new Request.Builder()
+                .url(url)
+                .method("POST", transformBody)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            return responseBody.getData().toString();
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong transform save failed: %s", e.getMessage()), e);
+        }
+    }
+
+    public List<TransformResponse> listTransform(String groupId, String streamId) {
+        final String path = HTTP_PATH + "/transform/list";
+        String url = formatUrl(path);
+        url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+        Request request = new Request.Builder().get()
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed:%s", responseBody.getErrMsg()));
+            return InlongParser.parseTransformList(responseBody);
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong transform list failed with ex:%s", e.getMessage()), e);
+        }
+    }
+
+    public Pair<Boolean, String> updateTransform(TransformRequest transformRequest) {
+        final String path = HTTP_PATH + "/transform/update";
+        final String url = formatUrl(path);
+        final String transform = GsonUtil.toJson(transformRequest);
+        final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), transform);
+        Request request = new Request.Builder()
+                .method("POST", storageBody)
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            if (responseBody.getData() != null) {
+                return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg());
+            } else {
+                return Pair.of(false, responseBody.getErrMsg());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong transform update failed with ex:%s", e.getMessage()), e);
+        }
+    }
+
+    public boolean deleteTransform(TransformRequest transformRequest) {
+        AssertUtil.notEmpty(transformRequest.getInlongGroupId(), "inlongGroupId should not be null");
+        AssertUtil.notEmpty(transformRequest.getInlongStreamId(), "inlongStreamId should not be null");
+        AssertUtil.notEmpty(transformRequest.getTransformName(), "transformName should not be null");
+        final String path = HTTP_PATH + "/transform/delete";
+        String url = formatUrl(path);
+        url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s&transformName=%s", url,
+                transformRequest.getInlongGroupId(),
+                transformRequest.getInlongStreamId(),
+                transformRequest.getTransformName());
+        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+        Request request = new Request.Builder()
+                .url(url)
+                .method("DELETE", requestBody)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            return Boolean.parseBoolean(responseBody.getData().toString());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Inlong transform delete failed: %s", e.getMessage()), e);
+        }
+    }
+
     public String createSink(SinkRequest sinkRequest) {
         String path = HTTP_PATH + "/sink/save";
         final String sink = GsonUtil.toJson(sinkRequest);
@@ -758,11 +865,11 @@ public class InnerInlongManagerClient {
         }
     }
 
-    private void assertHttpSuccess(okhttp3.Response response, String body, String path) {
+    protected void assertHttpSuccess(okhttp3.Response response, String body, String path) {
         AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request=%s failed: %s", path, body));
     }
 
-    private String formatUrl(String path) {
+    protected String formatUrl(String path) {
         return String.format("http://%s:%s/%s?username=%s&password=%s", host, port, path, uname, passwd);
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 5b85758b3..86220b5dc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
 
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,8 @@ public class InnerStreamContext {
 
     private Map<String, SinkRequest> sinkRequests = Maps.newHashMap();
 
+    private Map<String, TransformRequest> transformRequests = Maps.newHashMap();
+
     public InnerStreamContext(InlongStreamInfo streamInfo) {
         this.streamInfo = streamInfo;
     }
@@ -54,4 +57,7 @@ public class InnerStreamContext {
         this.sinkRequests.put(sinkRequest.getSinkName(), sinkRequest);
     }
 
+    public void setTransformRequest(TransformRequest transformRequest) {
+        this.transformRequests.put(transformRequest.getTransformName(), transformRequest);
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java
new file mode 100644
index 000000000..11d5a466b
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.transform;
+
+import io.swagger.annotations.ApiModel;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+@ApiModel("StreamTransform with multiple pre stream nodes, such as join")
+public class MultiDependencyTransform extends StreamTransform {
+
+    /**
+     * Constructor of MultiDependencyTransform
+     *
+     * @param transformName
+     * @param transformDefinition
+     * @param preNodes name of pre streamNodes, if pre streamNode is streamSource, then preNode is sourceName
+     *         if pre streamNode is streamTransform, preNode is transformName
+     */
+    public MultiDependencyTransform(String transformName, TransformDefinition transformDefinition, String... preNodes) {
+        AssertUtil.notNull(transformDefinition, "TransformDefinition should not be null");
+        this.transformDefinition = transformDefinition;
+        AssertUtil.notNull(transformName, "TransformName should not be empty");
+        this.transformName = transformName;
+        AssertUtil.noNullElements(preNodes, "Pre streamNode should not be null");
+        for (String preNode : preNodes) {
+            this.addPre(preNode);
+        }
+    }
+
+    /**
+     * Constructor of MultiDependencyTransform
+     *
+     * @param transformName
+     * @param transformDefinition
+     * @param preNodes name of pre streamNodes, if pre streamNode is streamSource, then preNode is sourceName
+     *         if pre streamNode is streamTransform, preNode is transformName
+     * @param postNodes postNodes name of post streamNode, if post streamNode is streamSource, then postNode is
+     *         sourceName, if post streamNode is streamTransform, postNode is transformName
+     */
+    public MultiDependencyTransform(String transformName, TransformDefinition transformDefinition,
+            List<String> preNodes, List<String> postNodes) {
+        this(transformName, transformDefinition, preNodes.toArray(new String[preNodes.size()]));
+        if (postNodes != null) {
+            for (String postNode : postNodes) {
+                this.addPost(postNode);
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java
new file mode 100644
index 000000000..e58851c05
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.transform;
+
+import io.swagger.annotations.ApiModel;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+@ApiModel("StreamTransform with one pre stream node, such as filter, splitter, etc")
+public class SingleDependencyTransform extends StreamTransform {
+
+    /**
+     * Constructor of SingleDependencyTransform
+     *
+     * @param transformName
+     * @param transformDefinition
+     * @param preNode name of pre streamNode, if pre streamNode is streamSource, then preNode is sourceName
+     *         if pre streamNode is streamTransform, preNode is transformName
+     */
+    public SingleDependencyTransform(String transformName, TransformDefinition transformDefinition, String preNode) {
+        AssertUtil.notNull(transformDefinition, "TransformDefinition should not be null");
+        this.transformDefinition = transformDefinition;
+        AssertUtil.notNull(transformName, "TransformName should not be empty");
+        this.transformName = transformName;
+        AssertUtil.notNull(preNode, "Pre streamNode should not be null");
+        this.addPre(preNode);
+    }
+
+    /**
+     * Constructor of SingleDependencyTransform
+     *
+     * @param transformName
+     * @param transformDefinition
+     * @param preNode name of pre streamNode, if pre streamNode is streamSource, then preNode is sourceName
+     *         if pre streamNode is streamTransform, preNode is transformName
+     * @param postNodes name of post streamNode, if post streamNode is streamSource, then postNode is sourceName
+     *         if post streamNode is streamTransform, postNode is transformName
+     */
+    public SingleDependencyTransform(String transformName, TransformDefinition transformDefinition, String preNode,
+            String... postNodes) {
+        this(transformName, transformDefinition, preNode);
+        if (postNodes != null) {
+            for (String postNode : postNodes) {
+                this.addPost(postNode);
+            }
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
index 86ac32d04..90ce0478a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
@@ -107,6 +107,12 @@ public class AssertUtil {
         }
     }
 
+    public static void notEmpty(String obj, String message) {
+        if (StringUtils.isEmpty(obj)) {
+            throw new IllegalStateException(message);
+        }
+    }
+
     public static void notEmpty(Object[] array) {
         notEmpty(array, "[Assertion failed] - this array must not be empty: it must contain at least 1 element");
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index ba100dcc2..a161d1da9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -52,6 +52,7 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
@@ -225,6 +226,14 @@ public class InlongParser {
         }
     }
 
+    public static List<TransformResponse> parseTransformList(Response response) {
+        Object data = response.getData();
+        String pageInfoJson = GsonUtil.toJson(data);
+        return GsonUtil.fromJson(pageInfoJson,
+                new TypeToken<List<TransformResponse>>() {
+                }.getType());
+    }
+
     public static PageInfo<SinkListResponse> parseSinkList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 590b9fbae..9fe498b85 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.client.api.util;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 2977df9ae..cc9106d70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -46,9 +46,14 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Transfer the inlong stream source.
@@ -107,42 +112,43 @@ public class InlongStreamSourceTransfer {
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
-    private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
+    private static KafkaSource parseKafkaSource(KafkaSourceResponse response) {
         KafkaSource kafkaSource = new KafkaSource();
-        kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
-        kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
-        DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+        kafkaSource.setSourceName(response.getSourceName());
+        kafkaSource.setConsumerGroup(response.getGroupId());
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
         kafkaSource.setDataFormat(dataFormat);
-        kafkaSource.setState(State.parseByStatus(kafkaSourceResponse.getStatus()));
-        kafkaSource.setAgentIp(kafkaSourceResponse.getAgentIp());
-        kafkaSource.setTopic(kafkaSourceResponse.getTopic());
-        kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
-        kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
-        kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
-        kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+        kafkaSource.setState(State.parseByStatus(response.getStatus()));
+        kafkaSource.setAgentIp(response.getAgentIp());
+        kafkaSource.setTopic(response.getTopic());
+        kafkaSource.setBootstrapServers(response.getBootstrapServers());
+        kafkaSource.setByteSpeedLimit(response.getByteSpeedLimit());
+        kafkaSource.setTopicPartitionOffset(response.getTopicPartitionOffset());
+        kafkaSource.setRecordSpeedLimit(response.getRecordSpeedLimit());
         kafkaSource.setSyncType(SyncType.FULL);
-        kafkaSource.setDatabasePattern(kafkaSourceResponse.getDatabasePattern());
-        kafkaSource.setTablePattern(kafkaSourceResponse.getTablePattern());
-        kafkaSource.setIgnoreParseErrors(kafkaSourceResponse.isIgnoreParseErrors());
-        kafkaSource.setTimestampFormatStandard(kafkaSourceResponse.getTimestampFormatStandard());
+        kafkaSource.setDatabasePattern(response.getDatabasePattern());
+        kafkaSource.setTablePattern(response.getTablePattern());
+        kafkaSource.setIgnoreParseErrors(response.isIgnoreParseErrors());
+        kafkaSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+        kafkaSource.setFields(parseStreamFields(response.getFieldList()));
         return kafkaSource;
     }
 
-    private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaResponse) {
+    private static KafkaSource parseKafkaSource(KafkaSourceListResponse response) {
         KafkaSource kafkaSource = new KafkaSource();
-        kafkaSource.setSourceName(kafkaResponse.getSourceName());
-        kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
-        kafkaSource.setState(State.parseByStatus(kafkaResponse.getStatus()));
-        DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
+        kafkaSource.setSourceName(response.getSourceName());
+        kafkaSource.setConsumerGroup(response.getGroupId());
+        kafkaSource.setState(State.parseByStatus(response.getStatus()));
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
         kafkaSource.setDataFormat(dataFormat);
-        kafkaSource.setTopic(kafkaResponse.getTopic());
-        kafkaSource.setBootstrapServers(kafkaResponse.getBootstrapServers());
-        kafkaSource.setByteSpeedLimit(kafkaResponse.getByteSpeedLimit());
-        kafkaSource.setTopicPartitionOffset(kafkaResponse.getTopicPartitionOffset());
+        kafkaSource.setTopic(response.getTopic());
+        kafkaSource.setBootstrapServers(response.getBootstrapServers());
+        kafkaSource.setByteSpeedLimit(response.getByteSpeedLimit());
+        kafkaSource.setTopicPartitionOffset(response.getTopicPartitionOffset());
 
-        KafkaOffset offset = KafkaOffset.forName(kafkaResponse.getAutoOffsetReset());
+        KafkaOffset offset = KafkaOffset.forName(response.getAutoOffsetReset());
         kafkaSource.setAutoOffsetReset(offset);
-        kafkaSource.setRecordSpeedLimit(kafkaResponse.getRecordSpeedLimit());
+        kafkaSource.setRecordSpeedLimit(response.getRecordSpeedLimit());
         kafkaSource.setSyncType(SyncType.FULL);
         return kafkaSource;
     }
@@ -171,6 +177,7 @@ public class InlongStreamSourceTransfer {
         if (StringUtils.isNotBlank(response.getTableWhiteList())) {
             binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
         }
+        binlogSource.setFields(parseStreamFields(response.getFieldList()));
         return binlogSource;
     }
 
@@ -208,6 +215,7 @@ public class InlongStreamSourceTransfer {
         fileSource.setPattern(response.getPattern());
         fileSource.setIp(response.getIp());
         fileSource.setTimeOffset(response.getTimeOffset());
+        fileSource.setFields(parseStreamFields(response.getFieldList()));
         return fileSource;
     }
 
@@ -228,6 +236,7 @@ public class InlongStreamSourceTransfer {
         autoPushSource.setState(State.parseByStatus(response.getStatus()));
         autoPushSource.setDataFormat(DataFormat.NONE);
         autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+        autoPushSource.setFields(parseStreamFields(response.getFieldList()));
         return autoPushSource;
     }
 
@@ -240,11 +249,11 @@ public class InlongStreamSourceTransfer {
         return autoPushSource;
     }
 
-    private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
+    private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
         KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
         sourceRequest.setSourceName(kafkaSource.getSourceName());
-        sourceRequest.setInlongGroupId(stream.getInlongGroupId());
-        sourceRequest.setInlongStreamId(stream.getInlongStreamId());
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         sourceRequest.setSourceType(kafkaSource.getSourceType().getType());
         sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
         sourceRequest.setTopic(kafkaSource.getTopic());
@@ -258,6 +267,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setTablePattern(kafkaSource.getTablePattern());
         sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
         sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
+        sourceRequest.setFieldList(createStreamFields(kafkaSource.getFields(), streamInfo));
         return sourceRequest;
     }
 
@@ -288,6 +298,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setSnapshotMode("initial");
         sourceRequest.setIntervalMs("500");
         sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+        sourceRequest.setFieldList(createStreamFields(binlogSource.getFields(), streamInfo));
         return sourceRequest;
     }
 
@@ -308,6 +319,7 @@ public class InlongStreamSourceTransfer {
         }
         sourceRequest.setPattern(fileSource.getPattern());
         sourceRequest.setTimeOffset(fileSource.getTimeOffset());
+        sourceRequest.setFieldList(createStreamFields(fileSource.getFields(), streamInfo));
         return sourceRequest;
     }
 
@@ -322,6 +334,23 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         sourceRequest.setSourceType(source.getSourceType().getType());
         sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+        sourceRequest.setFieldList(createStreamFields(source.getFields(), streamInfo));
         return sourceRequest;
     }
+
+    private static List<InlongStreamFieldInfo> createStreamFields(
+            List<StreamField> fields, InlongStreamInfo streamInfo) {
+        if (CollectionUtils.isEmpty(fields)) {
+            return null;
+        }
+        return InlongStreamTransfer.createStreamFields(fields, streamInfo);
+    }
+
+    private static List<StreamField> parseStreamFields(List<InlongStreamFieldInfo> fields) {
+        if (CollectionUtils.isEmpty(fields)) {
+            return null;
+        }
+        return fields.stream().map(fieldInfo -> CommonBeanUtils.copyProperties(fieldInfo, StreamField::new))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index e4cb801c7..f72e27d7f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.api.util;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
new file mode 100644
index 000000000..d1fd7d76e
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
+import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.TransformDefinitionUtils;
+
+import java.util.List;
+
+public class InlongStreamTransformTransfer {
+
+    public static TransformRequest createTransformRequest(StreamTransform streamTransform,
+            InlongStreamInfo streamInfo) {
+        TransformRequest transformRequest = new TransformRequest();
+        Preconditions.checkNotEmpty(streamTransform.getTransformName(), "TransformName should not be null");
+        transformRequest.setTransformName(streamTransform.getTransformName());
+        transformRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        transformRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        Preconditions.checkNotNull(streamTransform.getTransformDefinition(), "TransformDefinition should not be null");
+        TransformDefinition transformDefinition = streamTransform.getTransformDefinition();
+        transformRequest.setTransformType(transformDefinition.getTransformType().getType());
+        transformRequest.setVersion(1);
+        transformRequest.setTransformDefinition(GsonUtil.toJson(transformDefinition));
+        if (CollectionUtils.isNotEmpty(streamTransform.getPreNodes())) {
+            transformRequest.setPreNodeNames(Joiner.on(",").join(streamTransform.getPreNodes()));
+        }
+        if (CollectionUtils.isNotEmpty(streamTransform.getPostNodes())) {
+            transformRequest.setPostNodeNames(Joiner.on(",").join(streamTransform.getPostNodes()));
+        }
+        return transformRequest;
+    }
+
+    public static StreamTransform parseStreamTransform(TransformResponse transformResponse) {
+        TransformType transformType = TransformType.forType(transformResponse.getTransformType());
+        String transformDefinitionStr = transformResponse.getTransformDefinition();
+        TransformDefinition transformDefinition = TransformDefinitionUtils.parseTransformDefinition(
+                transformDefinitionStr, transformType);
+        String transformName = transformResponse.getTransformName();
+        String preNodeNames = transformResponse.getPreNodeNames();
+        List<String> preNodes = Splitter.on(",").splitToList(preNodeNames);
+        StreamTransform streamTransform = null;
+        if (preNodes.size() > 1) {
+            streamTransform = new MultiDependencyTransform(transformName, transformDefinition,
+                    preNodes.toArray(new String[]{}));
+        } else {
+            streamTransform = new SingleDependencyTransform(transformName, transformDefinition,
+                    preNodes.get(0));
+        }
+        String postNodeNames = transformResponse.getPostNodeNames();
+        if (StringUtils.isNotEmpty(postNodeNames)) {
+            List<String> postNodes = Splitter.on(",").splitToList(postNodeNames);
+            streamTransform.setPostNodes(Sets.newHashSet(postNodes));
+        }
+        return streamTransform;
+    }
+
+}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
new file mode 100644
index 000000000..bbe8e1bde
--- /dev/null
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.InlongStream;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
+import org.apache.inlong.manager.client.api.sink.KafkaSink;
+import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
+import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition;
+import org.assertj.core.util.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InlongStreamImplTest {
+
+    @Test
+    public void testCreatePipeline() {
+        InlongStream inlongStream = new InlongStreamImpl();
+        // add stream source
+        KafkaSource kafkaSource = new KafkaSource();
+        kafkaSource.setSourceName("A");
+        MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
+        mySQLBinlogSource.setSourceName("B");
+        inlongStream.addSource(kafkaSource);
+        inlongStream.addSource(mySQLBinlogSource);
+        // add stream sink
+        ClickHouseSink clickHouseSink = new ClickHouseSink();
+        clickHouseSink.setSinkName("E");
+        HiveSink hiveSink = new HiveSink();
+        hiveSink.setSinkName("F");
+        KafkaSink kafkaSink1 = new KafkaSink();
+        kafkaSink1.setSinkName("I");
+        KafkaSink kafkaSink2 = new KafkaSink();
+        kafkaSink2.setSinkName("M");
+        inlongStream.addSink(clickHouseSink);
+        inlongStream.addSink(hiveSink);
+        inlongStream.addSink(kafkaSink1);
+        inlongStream.addSink(kafkaSink2);
+        // add stream transform
+        StreamTransform multiDependencyTransform = new MultiDependencyTransform(
+                "C",
+                new JoinerDefinition(kafkaSource, mySQLBinlogSource, Lists.newArrayList(), Lists.newArrayList(),
+                        JoinMode.INNER_JOIN),
+                "A", "B");
+        StreamTransform singleDependencyTransform1 = new SingleDependencyTransform(
+                "D", new FilterDefinition(FilterStrategy.REMOVE, Lists.newArrayList()), "C", "E", "F"
+        );
+
+        StreamTransform singleDependencyTransform2 = new SingleDependencyTransform(
+                "G", new SplitterDefinition(Lists.newArrayList()), "C", "I"
+        );
+        inlongStream.addTransform(multiDependencyTransform);
+        inlongStream.addTransform(singleDependencyTransform1);
+        inlongStream.addTransform(singleDependencyTransform2);
+        StreamPipeline streamPipeline = inlongStream.createPipeline();
+        String pipelineView = GsonUtil.toJson(streamPipeline);
+        Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"C\"],\"outputNodes\":[\"D\",\"G\"]"));
+        Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"D\"],\"outputNodes\":[\"E\",\"F\"]}"));
+    }
+}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
index 9742174ba..3c51ea383 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
@@ -41,5 +41,4 @@ public class InnerInlongManagerClientTest {
         List<FullStreamResponse> fullStreamResponseList = innerInlongManagerClient.listStreamInfo("test");
         Assert.assertNull(fullStreamResponseList);
     }
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 778a796ff..ae26600b3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -23,8 +23,6 @@ package org.apache.inlong.manager.common.enums;
 @Deprecated
 public class Constant {
 
-    public static final Integer UN_DELETED = 0;
-    public static final Integer IS_DELETED = 1;
     public static final String URL_SPLITTER = ",";
     public static final String HOST_SPLITTER = ":";
 
@@ -38,28 +36,11 @@ public class Constant {
 
     public static final String DATA_TYPE_KEY_VALUE = "KEY-VALUE";
 
-    public static final String FILE_FORMAT_TEXT = "TextFile";
-
-    public static final String FILE_FORMAT_ORC = "OrcFile";
-
-    public static final String FILE_FORMAT_SEQUENCE = "SequenceFile";
-
-    public static final String FILE_FORMAT_PARQUET = "Parquet";
-
     public static final String SCHEMA_M0_DAY = "m0_day";
 
     public static final String CLUSTER_TUBE = "TUBE";
     public static final String CLUSTER_PULSAR = "PULSAR";
     public static final String CLUSTER_TDMQ_PULSAR = "TDMQ_PULSAR";
-    public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
-
-    public static final String ID_IS_EMPTY = "primary key is empty";
-
-    public static final String GROUP_ID_IS_EMPTY = "inlong group id is empty";
-
-    public static final String STREAM_ID_IS_EMPTY = "inlong stream id is empty";
-
-    public static final String REQUEST_IS_EMPTY = "request is empty";
 
     public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
 
@@ -75,16 +56,4 @@ public class Constant {
 
     public static final Integer DISABLE_CREATE_RESOURCE = 0; // Disable create resource
 
-    public static final String CLUSTER_TUBE_MANAGER = "cluster_tube_manager";
-
-    public static final String CLUSTER_TUBE_CLUSTER_ID = "cluster_tube_clusterId";
-
-    public static final String PULSAR_ADMINURL = "pulsar_adminUrl";
-
-    public static final String PULSAR_SERVICEURL = "pulsar_serviceUrl";
-
-    public static final String TUBE_MASTER_URL = "tube_masterUrl";
-
-    public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
index 34388004c..59c9146b2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
 /**
  * Entity status enum
  */
+@Deprecated
 public enum DBCollectorDetailTaskEntityStatus {
 
     INIT(0, "first inserted"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
index 1ba3fcd8d..db43a0439 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
 /**
  * Constant for db collector task
  */
+@Deprecated
 public class DBCollectorTaskConstant {
 
     public static final String INTERFACE_VERSION = "1.0";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
index 796672458..5539990df 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
 /**
  * Operation type
  */
+@Deprecated
 public enum DBCollectorTaskReturnCode {
 
     SUCC(0, "success"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ff2ba5ec5..c16ceb3d5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -23,6 +23,10 @@ public enum ErrorCodeEnum {
     PERMISSION_REQUIRED(2003, "The current user does not have operation authority"),
     AUTHENTICATION_REQUIRED(2004, "Authentication failed"),
 
+    ID_IS_EMPTY(101, "Primary key is empty"),
+    GROUP_ID_IS_EMPTY(102, "Inlong group id is empty"),
+    STREAM_ID_IS_EMPTY(103, "Inlong stream id is empty"),
+    REQUEST_IS_EMPTY(104, "Request is empty"),
     USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
 
     GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
@@ -65,13 +69,8 @@ public enum ErrorCodeEnum {
     SOURCE_ALREADY_EXISTS(1304, "Source already exist with the groupId and streamId"),
     SOURCE_SAVE_FAILED(1305, "Failed to save or update source info"),
     SOURCE_OPT_NOT_ALLOWED(1306, "Current status does not allow add/modification/delete source info"),
-
-    SOURCE_DUPLICATE(1301, "Stream source already exists"),
-    SOURCE_BASIC_NOT_FOUND(1302, "The basic information of the stream source does not exist"),
-    SOURCE_DETAIL_NOT_FOUND(1303, "Stream source info does not exist"),
-    SOURCE_TYPE_NOT_SUPPORTED(1304, "Source type is not supported"),
-    SOURCE_BASIC_DELETE_HAS_DETAIL(1305,
-            "The stream source contains detailed info and is not allowed to be deleted"),
+    SOURCE_TYPE_NOT_SAME(1307, "Expected source type is %s, but found %s"),
+    SOURCE_NAME_IS_NULL(1308, "Source name is null"),
 
     HIVE_OPERATION_FAILED(1311, "Hive operation failed"),
 
@@ -90,7 +89,11 @@ public enum ErrorCodeEnum {
     PARTITION_FIELD_NAME_IS_EMPTY(1412, "Partition field name cannot be empty"),
     PARTITION_FIELD_NOT_FOUND(1413, "Sink partition field [%s] not found in sink field list"),
     PARTITION_FIELD_NO_SOURCE_FIELD(1414, "Sink partition field [%s] must have a related source field name"),
+    SINK_TYPE_NOT_SAME(1415, "Expected sink type is %s, but found %s"),
+    SINK_NAME_IS_NULL(1416, "Sink name is null"),
 
+    TRANSFORM_TYPE_IS_NULL(1500, "Transform type is null"),
+    TRANSFORM_NAME_IS_NULL(1501, "Transform name is null"),
 
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
index 855f74035..847652d44 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
@@ -39,6 +39,6 @@ public enum MQType {
                 return mqType;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+        throw new IllegalArgumentException(String.format("Unsupported queue=%s for Inlong", type));
     }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 3eea7f22a..1c51ef520 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -28,9 +28,6 @@ public enum SinkType {
     public static final String SINK_ICEBERG = "ICEBERG";
     public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
 
-    public static final String SINK_TYPE_IS_EMPTY = "Sink type is empty";
-    public static final String SINK_TYPE_NOT_SAME = "Expected sink type is %s, but found %s";
-
     /**
      * Get the SinkType enum via the given sinkType string
      */
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 9d6415ca9..2ea9726fa 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -39,9 +39,6 @@ public enum SourceType {
     public static final String SOURCE_BINLOG = "BINLOG";
     public static final String SOURCE_KAFKA = "KAFKA";
 
-    public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
-    public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
-
     @Getter
     private final String type;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
similarity index 55%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index 855f74035..b4d326b91 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -19,26 +19,49 @@ package org.apache.inlong.manager.common.enums;
 
 import lombok.Getter;
 
-public enum MQType {
+import java.util.Locale;
 
-    PULSAR("PULSAR"),
-    TUBE("TUBE"),
-    TDMQ_PULSAR("TDMQ_PULSAR"),
-    NONE("NONE");
+public enum TransformType {
+
+    /**
+     * Replace string field based on regex
+     */
+    STRING_REPLACER("string_replacer"),
+
+    /**
+     * Split field by separator
+     */
+    SPLITTER("splitter"),
+
+    /**
+     * Filter stream records on given regulations
+     */
+    FILTER("filter"),
+
+    /**
+     * Remove duplication records on given fields
+     */
+    DE_DUPLICATION("de_duplication"),
+
+    /**
+     * Joins different sources in one stream
+     */
+    JOINER("joiner");
 
     @Getter
     private String type;
 
-    MQType(String type) {
+    TransformType(String type) {
         this.type = type;
     }
 
-    public static MQType forType(String type) {
-        for (MQType mqType : values()) {
-            if (mqType.getType().equals(type)) {
-                return mqType;
+    public static TransformType forType(String type) {
+        for (TransformType transformType : values()) {
+            if (transformType.getType().equals(type) || transformType.getType().toUpperCase(Locale.ROOT).equals(type)) {
+                return transformType;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+        throw new IllegalArgumentException(String.format("Unsupported transform=%s for Inlong", type));
     }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 1cc9bf76f..6cb66e5e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 
 import javax.validation.constraints.NotNull;
+import java.util.List;
 
 /**
  * Request of source
@@ -69,4 +71,7 @@ public class SourceRequest {
     @ApiModelProperty("Version")
     private Integer version;
 
+    @ApiModelProperty("Field list, only support when inlong group in light weight mode")
+    private List<InlongStreamFieldInfo> fieldList;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 9c09ae51b..75b177a6a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 
 import java.util.Date;
+import java.util.List;
 
 /**
  * Response of the stream source
@@ -84,4 +86,7 @@ public class SourceResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
+    @ApiModelProperty(value = "Field list")
+    private List<InlongStreamFieldInfo> fieldList;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index b23e42db8..ba68f6d43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -83,6 +83,9 @@ public class InlongStreamRequest extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "Names of responsible persons, separated by commas")
     private String inCharges;
 
+    @ApiModelProperty(value = "StreamPipeline snapshot of stream,, string in JSON format")
+    private String tempView;
+
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
similarity index 82%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index 3f487d0a5..b6a641d81 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.common.pojo.stream;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -30,6 +30,9 @@ import org.apache.inlong.manager.common.enums.FieldType;
 @ApiModel("Stream field configuration")
 public class StreamField {
 
+    public static final StreamField PROCESSING_TIME = new StreamField(100, FieldType.BIGINT, "PROCESSING_TIME",
+            null, null, 1);
+
     public StreamField(int index, FieldType fieldType, String fieldName, String fieldComment, String fieldValue) {
         this.id = index;
         this.fieldType = fieldType;
@@ -38,6 +41,12 @@ public class StreamField {
         this.fieldValue = fieldValue;
     }
 
+    public StreamField(int index, FieldType fieldType, String fieldName, String fieldComment, String fieldValue,
+            Integer isMetaField) {
+        this(index, fieldType, fieldName, fieldComment, fieldValue);
+        this.isMetaField = isMetaField;
+    }
+
     @ApiModelProperty("Field index")
     private Integer id;
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
similarity index 52%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
index 0c18cd558..bcea68d3c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
@@ -15,30 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.common.pojo.stream;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
+import com.google.common.collect.Sets;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 @Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public class StreamNode {
 
-    @ApiModelProperty(value = "DataSink name", required = true)
-    private String sinkName;
+    protected Set<String> preNodes;
 
-    @ApiModelProperty("Other properties if need")
-    private Map<String, Object> properties;
+    protected Set<String> postNodes;
 
-    public abstract SinkType getSinkType();
+    protected List<StreamField> fields;
 
-    public abstract List<SinkField> getSinkFields();
-
-    public abstract DataFormat getDataFormat();
+    public void addPre(String pre) {
+        Preconditions.checkNotEmpty(pre, "Pre node should not be empty");
+        if (preNodes == null) {
+            preNodes = Sets.newHashSet();
+        }
+        preNodes.add(pre);
+    }
 
+    public void addPost(String post) {
+        Preconditions.checkNotEmpty(post, "Post node should not be empty");
+        if (postNodes == null) {
+            postNodes = Sets.newHashSet();
+        }
+        postNodes.add(post);
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
new file mode 100644
index 000000000..280e6c0e4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Sets;
+import lombok.Data;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.Set;
+
+@Data
+public class StreamNodeRelationship {
+
+    private Set<String> inputNodes;
+
+    private Set<String> outputNodes;
+
+    public StreamNodeRelationship() {
+        this(Sets.newHashSet(), Sets.newHashSet());
+    }
+
+    public StreamNodeRelationship(Set<String> inputNodes, Set<String> outputNodes) {
+        this.inputNodes = inputNodes;
+        this.outputNodes = outputNodes;
+    }
+
+    public void addInputNode(String inputNode) {
+        Preconditions.checkNotEmpty(inputNode, "Input node should not be empty");
+        inputNodes.add(inputNode);
+    }
+
+    public void addOutputNode(String outputNode) {
+        Preconditions.checkNotEmpty(outputNode, "Input node should not be empty");
+        outputNodes.add(outputNode);
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
new file mode 100644
index 000000000..6b12abb69
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+@Data
+public class StreamPipeline {
+
+    private List<StreamNodeRelationship> pipeline;
+
+    public StreamPipeline() {
+        this(Lists.newArrayList());
+    }
+
+    public StreamPipeline(List<StreamNodeRelationship> pipeline) {
+        Preconditions.checkNotNull(pipeline, "Pipeline should not be null");
+        this.pipeline = pipeline;
+    }
+
+    public void addRelationShip(StreamNodeRelationship relationship) {
+        pipeline.add(relationship);
+    }
+
+    /**
+     * Check if pipeline has circle
+     * If has one, return circled node names;
+     *
+     * @return
+     */
+    public Pair<Boolean, Pair<String, String>> hasCircle() {
+        Map<String, Set<String>> priorityMap = Maps.newHashMap();
+        for (StreamNodeRelationship relationship : pipeline) {
+            Set<String> inputNodes = relationship.getInputNodes();
+            Set<String> outputNodes = relationship.getOutputNodes();
+            for (String inputNode : inputNodes) {
+                for (String outputNode : outputNodes) {
+                    priorityMap.computeIfAbsent(inputNode, key -> Sets.newHashSet()).add(outputNode);
+                    if (CollectionUtils.isEmpty(priorityMap.get(outputNode))) {
+                        continue;
+                    }
+                    Set<String> priorityNodesOfOutput = priorityMap.get(outputNode);
+                    if (priorityNodesOfOutput.contains(inputNode)) {
+                        return Pair.of(true, Pair.of(inputNode, outputNode));
+                    } else {
+                        if (isReach(priorityMap, priorityNodesOfOutput, inputNode)) {
+                            return Pair.of(true, Pair.of(inputNode, outputNode));
+                        }
+                    }
+                }
+            }
+        }
+        return Pair.of(false, null);
+    }
+
+    private boolean isReach(Map<String, Set<String>> paths, Set<String> inputs, String output) {
+        Queue<String> queue = new LinkedList<>();
+        queue.addAll(inputs);
+        Set<String> preNodes = new HashSet<>(inputs);
+        while (!queue.isEmpty()) {
+            String node = queue.remove();
+            if (paths.get(node) == null) {
+                continue;
+            }
+            Set<String> postNodes = paths.get(node);
+            if (postNodes.contains(output)) {
+                return true;
+            }
+            for (String postNode : postNodes) {
+                if (!inputs.contains(postNode)) {
+                    preNodes.add(postNode);
+                    queue.add(postNode);
+                }
+            }
+        }
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
similarity index 60%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
index 5022d48d4..263f17a3f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
@@ -15,18 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service;
+package org.apache.inlong.manager.common.pojo.transform;
 
-import org.apache.inlong.manager.test.BaseTest;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.test.context.SpringBootTest;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
 
-@SpringBootApplication
-@SpringBootTest(classes = ServiceBaseTest.class)
-public class ServiceBaseTest extends BaseTest {
+@Data
+public abstract class TransformDefinition {
 
-    public final String globalGroupId = "b_group1";
-    public final String globalStreamId = "stream1";
-    public final String globalOperator = "admin";
+    protected TransformType transformType;
 
+    @JsonFormat
+    public enum OperationType {
+        lt, le, eq, ne, ge, gt, is_null, not_null
+    }
+
+    @JsonFormat
+    public enum ScriptType {
+        PYTHON, JAVA
+    }
+
+    @JsonFormat
+    public enum RuleRelation {
+        AND, OR
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
similarity index 53%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
index 1cc9bf76f..79b83c5c9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.source;
+package org.apache.inlong.manager.common.pojo.transform;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
@@ -26,14 +24,13 @@ import lombok.Data;
 import javax.validation.constraints.NotNull;
 
 /**
- * Request of source
+ * Request of transform
  */
 @Data
-@ApiModel("Request of source")
-@JsonTypeInfo(use = Id.NAME, visible = true, property = "sourceType")
-public class SourceRequest {
+@ApiModel("Request of stream transform")
+public class TransformRequest {
 
-    private Integer id;
+    private int id;
 
     @NotNull
     @ApiModelProperty("Inlong group id")
@@ -44,29 +41,26 @@ public class SourceRequest {
     private String inlongStreamId;
 
     @NotNull
-    @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
-    private String sourceType;
+    @ApiModelProperty("Transform name, unique in one stream")
+    private String transformName;
 
     @NotNull
-    @ApiModelProperty("Source name, unique in one stream")
-    private String sourceName;
+    @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+    private String transformType;
 
-    @ApiModelProperty("Mac uuid of the agent running the task")
-    private String uuid;
-
-    @ApiModelProperty("Id of the source server")
-    private Integer serverId;
-
-    @ApiModelProperty("Id of the cluster that collected this source")
-    private Integer clusterId;
+    @NotNull
+    @ApiModelProperty("Pre node names of transform in this stream, join by ','")
+    private String preNodeNames = "";
 
-    @ApiModelProperty("Serialization type, support: csv, json, canal, avro, etc")
-    private String serializationType;
+    @NotNull
+    @ApiModelProperty("Post node names of transform in this stream, join by ','")
+    private String postNodeNames = "";
 
-    @ApiModelProperty("Snapshot of the source task")
-    private String snapshot;
+    @NotNull
+    @ApiModelProperty("Transform definition in json type")
+    private String transformDefinition;
 
-    @ApiModelProperty("Version")
+    @ApiModelProperty("Version of transform")
     private Integer version;
-
 }
+
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java
new file mode 100644
index 000000000..abb17b654
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+/**
+ * Response of transform
+ */
+@Data
+@ApiModel("Response of the stream transform")
+public class TransformResponse {
+
+    private int id;
+
+    @ApiModelProperty("Inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty("Inlong stream id")
+    private String inlongStreamId;
+
+    @ApiModelProperty("Transform name, unique in one stream")
+    private String transformName;
+
+    @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+    private String transformType;
+
+    @ApiModelProperty("Pre node names of transform in this stream")
+    private String preNodeNames;
+
+    @ApiModelProperty("Post node names of transform in this stream")
+    private String postNodeNames;
+
+    @ApiModelProperty("Transform definition in json type")
+    private String transformDefinition;
+
+    @ApiModelProperty("Version of transform")
+    private Integer version;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java
new file mode 100644
index 000000000..fbb83f85c
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.deduplication;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A class to define operation to duplicate message in a time duration.
+ * DupFields is needed to judge whether stream records is duplicate.
+ * TimingField is eventTime of stream records.
+ * Interval and timeunit is required to modify a time interval,
+ * during which duplicate records is operated;
+ */
+@Data
+@Builder
+public class DeDuplicationDefinition extends TransformDefinition {
+
+    public DeDuplicationDefinition(List<StreamField> dupFields,
+            StreamField timingField,
+            long interval,
+            TimeUnit timeUnit,
+            DeDuplicationStrategy deDuplicationStrategy) {
+        this.transformType = TransformType.DE_DUPLICATION;
+        this.dupFields = dupFields;
+        this.timingField = timingField;
+        this.interval = interval;
+        this.timeUnit = timeUnit;
+        this.deDuplicationStrategy = deDuplicationStrategy;
+    }
+
+    /**
+     * Duplicate fields for de_duplication transform
+     */
+    private List<StreamField> dupFields;
+
+    /**
+     * Event time field for de_duplication transform
+     */
+    private StreamField timingField;
+
+    /**
+     * Time interval for de_duplication transform
+     */
+    private long interval;
+
+    /**
+     * TimeUnit for de_duplication transform
+     */
+    private TimeUnit timeUnit;
+
+    @JsonFormat
+    public enum DeDuplicationStrategy {
+        REMOVE_ALL, RESERVE_FIRST, RESERVE_LAST
+    }
+
+    /**
+     * Strategy for de_duplication operation
+     */
+    private DeDuplicationStrategy deDuplicationStrategy;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
new file mode 100644
index 000000000..983b976c8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.filter;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to filter stream records by different modes.
+ * Rule mode is more recommended then script mode
+ */
+@Data
+public class FilterDefinition extends TransformDefinition {
+
+    public FilterDefinition(FilterStrategy filterStrategy, List<FilterRule> filterRules) {
+        this.transformType = TransformType.FILTER;
+        this.filterStrategy = filterStrategy;
+        this.filterMode = FilterMode.RULE;
+        this.filterRules = filterRules;
+    }
+
+    public FilterDefinition(FilterStrategy filterStrategy, ScriptBase scriptBase) {
+        this.transformType = TransformType.FILTER;
+        this.filterStrategy = filterStrategy;
+        this.filterMode = FilterMode.SCRIPT;
+        this.scriptBase = scriptBase;
+    }
+
+    @JsonFormat
+    public enum FilterStrategy {
+        RETAIN, REMOVE
+    }
+
+    @JsonFormat
+    public enum FilterMode {
+        RULE, SCRIPT
+    }
+
+    /**
+     * Strategy for Filter transform
+     */
+    private FilterStrategy filterStrategy;
+
+    /**
+     * Mode for Filter transform
+     */
+    private FilterMode filterMode;
+
+    @Data
+    @AllArgsConstructor
+    public static class TargetValue {
+
+        /**
+         * If target value is constant, set targetConstant, or set targetField if not;
+         */
+        private boolean isConstant;
+
+        private StreamField targetField;
+
+        private String targetConstant;
+    }
+
+    /**
+     * Filter rule is about relationship between sourceField and targetValue;
+     * such as 'a >= b' or 'a is not null'
+     */
+    @Data
+    @AllArgsConstructor
+    public static class FilterRule {
+
+        private StreamField sourceField;
+
+        private OperationType operationType;
+
+        private TargetValue targetValue;
+
+        private RuleRelation relationWithPost;
+    }
+
+    private List<FilterRule> filterRules;
+
+    @Data
+    @AllArgsConstructor
+    public static class ScriptBase {
+
+        private ScriptType scriptType;
+
+        private String script;
+    }
+
+    private ScriptBase scriptBase;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java
new file mode 100644
index 000000000..a682c5ed7
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation defined.
+ */
+@Data
+@Builder
+public class JoinerDefinition extends TransformDefinition {
+
+    public JoinerDefinition(StreamNode leftNode,
+            StreamNode rightNode,
+            List<StreamField> leftJoinFields,
+            List<StreamField> rightJoinFields,
+            JoinMode joinMode) {
+        this.transformType = TransformType.JOINER;
+        this.leftNode = leftNode;
+        this.rightNode = rightNode;
+        this.leftJoinFields = leftJoinFields;
+        this.rightJoinFields = rightJoinFields;
+        this.joinMode = joinMode;
+    }
+
+    /**
+     * Left node for join
+     */
+    private StreamNode leftNode;
+
+    /**
+     * Right node for join
+     */
+    private StreamNode rightNode;
+
+    /**
+     * Join streamFields from left node
+     */
+    private List<StreamField> leftJoinFields;
+
+    /**
+     * Join streamFields from right node
+     */
+    private List<StreamField> rightJoinFields;
+
+    @JsonFormat
+    public enum JoinMode {
+        LEFT_JOIN, RIGHT_JOIN, INNER_JOIN
+    }
+
+    /**
+     * Join mode for join transform
+     */
+    private JoinMode joinMode;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java
new file mode 100644
index 000000000..8bcde2005
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.replacer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to replace stream fields in stream records by ReplaceRule defined.
+ */
+@Data
+@Builder
+public class StringReplacerDefinition extends TransformDefinition {
+
+    public StringReplacerDefinition(List<ReplaceRule> replaceRules) {
+        this.transformType = TransformType.STRING_REPLACER;
+        this.replaceRules = replaceRules;
+    }
+
+    public enum ReplaceMode {
+        RELACE_ALL, RELACE_FIRST
+    }
+
+    /**
+     * ReplaceRule is aim to define a replace action to string fields;
+     * If field value match regex, will be replaced by targetValue in REPLACE_ALL/REPLACE_FIRST mode;
+     */
+    @Data
+    @AllArgsConstructor
+    public static class ReplaceRule {
+
+        private StreamField sourceField;
+
+        private String regex;
+
+        private String targetValue;
+
+        private ReplaceMode mode;
+
+        private boolean isCaseSensitive;
+    }
+
+    private List<ReplaceRule> replaceRules;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
new file mode 100644
index 000000000..5d27de018
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to split fields according to SplitRule defined.
+ */
+@Data
+@Builder
+public class SplitterDefinition extends TransformDefinition {
+
+    public SplitterDefinition(List<SplitRule> splitRules) {
+        this.transformType = TransformType.SPLITTER;
+        this.splitRules = splitRules;
+    }
+
+    /**
+     * SplitterRule is aim to define a splitter action below:
+     * SourceField will be splitted to targetFields by seperator
+     */
+    @Data
+    @AllArgsConstructor
+    public static class SplitRule {
+
+        /**
+         * Field to split;
+         */
+        private StreamField sourceField;
+
+        /**
+         * String seperator to split sourceField;
+         */
+        private String seperator;
+
+        /**
+         * Fields generated when sourceField is splitted
+         * Use sourceName_0, sourceName_1, sourceName_2 if not set
+         */
+        private List<String> targetFields;
+    }
+
+    /**
+     * Split rules for transform;
+     */
+    private List<SplitRule> splitRules;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index 2de3aa22b..a2717f9e7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -19,73 +19,56 @@ package org.apache.inlong.manager.common.settings;
 
 public class InlongGroupSettings {
 
-    public static String PULSAR_ADMIN_URL = "pulsar.adminUrl";
+    public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
 
-    public static String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
-
-    public static String PULSAR_AUTHENTICATION = "pulsar.authentication";
-
-    public static String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
-
-    public static String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
-
-    public static String TUBE_MANAGER_URL = "tube.manager.url";
-
-    public static String TUBE_MASTER_URL = "tube.master.url";
-
-    public static String TUBE_CLUSTER_ID = "tube.cluster.id";
+    public static final String DATA_FLOW = "dataFlow";
 
     /**
-     * oceanus need param start
+     * config of pulsar
      */
-    public static String SORT_JOB_ID = "sort.job.id";
-
-    public static String ENDPOINT = "endpoint";
-
-    public static String DATA_FLOW = "dataFlow";
-
-    public static String SECRET_ID = "secretId";
-
-    public static String SECRET_KEY = "secretKey";
+    public static final String PULSAR_ADMIN_URL = "pulsar.adminUrl";
 
-    public static String REGION = "region";
+    public static final String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
 
-    public static String CLUSTER_ID = "clusterId";
+    public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
 
-    public static String FS_ABSTRACT_FILE_SYSTEM_COSN_IMPL = "fs.AbstractFileSystem.cosn.impl";
+    public static final String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
 
-    public static String FS_COSN_IMPL = "fs.cosn.impl";
+    public static final String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
 
-    public static String FS_COSN_BUCKET_REGION = "fs.cosn.bucket.region";
-
-    public static String FS_COSN_USERINFO_APPID = "fs.cosn.userinfo.appid";
-
-    public static String FS_COSN_USERINFO_SECRET_ID = "fs.cosn.userinfo.secretId";
-
-    public static String FS_COSN_USERINFO_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+    /**
+     * config of tube mq
+     */
+    public static final String TUBE_MANAGER_URL = "tube.manager.url";
 
-    public static String FS_ABSTRACT_FILE_SYSTEM_OFS_IMPL = "fs.AbstractFileSystem.ofs.impl";
+    public static final String TUBE_MASTER_URL = "tube.master.url";
 
-    public static String FS_OFS_IMPL = "fs.ofs.impl";
+    public static final String TUBE_CLUSTER_ID = "tube.cluster.id";
 
-    public static String FS_OFS_TMP_CACHE_DIR = "fs.ofs.tmp.cache.dir";
+    /**
+     * config of dataproxy
+     */
+    public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
 
-    public static String FS_OFS_USER_APPID = "fs.ofs.user.appid";
+    /**
+     * config of sort
+     */
+    public static final String SORT_JOB_ID = "sort.job.id";
 
-    public static String SORT_TYPE = "sort.type";
+    public static final String SORT_TYPE = "sort.type";
 
-    public static String DEFAULT_SORT_TYPE = "flink";
+    public static final String DEFAULT_SORT_TYPE = "flink";
 
-    public static String SORT_NAME = "sort.name";
+    public static final String SORT_NAME = "sort.name";
 
-    public static String SORT_URL = "sort.url";
+    public static final String SORT_URL = "sort.url";
 
-    public static String SORT_AUTHENTICATION = "sort.authentication";
+    public static final String SORT_AUTHENTICATION = "sort.authentication";
 
-    public static String SORT_AUTHENTICATION_TYPE = "sort.authentication.type";
+    public static final String SORT_AUTHENTICATION_TYPE = "sort.authentication.type";
 
-    public static String DEFAULT_SORT_AUTHENTICATION_TYPE = "secret_and_token";
+    public static final String DEFAULT_SORT_AUTHENTICATION_TYPE = "secret_and_token";
 
-    public static String SORT_PROPERTIES = "sort.properties";
+    public static final String SORT_PROPERTIES = "sort.properties";
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java
new file mode 100644
index 000000000..dc7c3de86
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import com.google.gson.Gson;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.replacer.StringReplacerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition;
+
+public class TransformDefinitionUtils {
+
+    private static Gson gson = new Gson();
+
+    public static TransformDefinition parseTransformDefinition(String transformDefinition,
+            TransformType transformType) {
+        switch (transformType) {
+            case FILTER:
+                return gson.fromJson(transformDefinition, FilterDefinition.class);
+            case JOINER:
+                return gson.fromJson(transformDefinition, JoinerDefinition.class);
+            case SPLITTER:
+                return gson.fromJson(transformDefinition, SplitterDefinition.class);
+            case DE_DUPLICATION:
+                return gson.fromJson(transformDefinition, DeDuplicationDefinition.class);
+            case STRING_REPLACER:
+                return gson.fromJson(transformDefinition, StringReplacerDefinition.class);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported transformType for %s", transformType));
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
new file mode 100644
index 000000000..456f67578
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamPipelineTest {
+
+    @Test
+    public void testCheckHasCircle() {
+        StreamPipeline streamPipeline = new StreamPipeline();
+        streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
+        streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("C"), Sets.newHashSet("D")));
+        streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
+        streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("F"), Sets.newHashSet("G")));
+        streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
+        Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
+        Assert.assertTrue(circleState.getLeft());
+        Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getLeft()));
+        Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getRight()));
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java
new file mode 100644
index 000000000..763ac04d5
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition.OperationType;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition.RuleRelation;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition.DeDuplicationStrategy;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterRule;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.TargetValue;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TransformDefinitionTest {
+
+    public class BlankStreamNode extends StreamNode {
+
+    }
+
+    public static Gson gson = new Gson();
+
+    @Test
+    public void testParseDeDuplicationDefinition() {
+        List<StreamField> streamFields = createStreamFields();
+        StreamField timingField = new StreamField(2, FieldType.TIMESTAMP, "event_time", null, null);
+        DeDuplicationDefinition deDuplicationDefinition = new DeDuplicationDefinition(streamFields, timingField, 100,
+                TimeUnit.MICROSECONDS, DeDuplicationStrategy.RESERVE_FIRST);
+        String definitionJson = gson.toJson(deDuplicationDefinition);
+        DeDuplicationDefinition parsedDefinition = gson.fromJson(definitionJson, DeDuplicationDefinition.class);
+        Assert.assertEquals(deDuplicationDefinition, parsedDefinition);
+    }
+
+    @Test
+    public void testParseFilterDefinition() {
+        List<FilterRule> filterRules = createFilterRule();
+        FilterDefinition filterDefinition = new FilterDefinition(FilterStrategy.RETAIN, filterRules);
+        String definitionJson = gson.toJson(filterDefinition);
+        FilterDefinition parsedDefinition = gson.fromJson(definitionJson, FilterDefinition.class);
+        Assert.assertEquals(filterDefinition, parsedDefinition);
+    }
+
+    @Test
+    public void testJoinerDefinition() {
+        List<StreamField> streamFields = createStreamFields();
+        StreamNode leftNode = new BlankStreamNode();
+        leftNode.setFields(streamFields);
+        StreamNode rightNode = new BlankStreamNode();
+        rightNode.setFields(streamFields);
+        JoinerDefinition joinerDefinition = new JoinerDefinition(leftNode, rightNode, streamFields, streamFields,
+                JoinMode.INNER_JOIN);
+        String definitionJson = gson.toJson(joinerDefinition);
+        JoinerDefinition parsedDefinition = gson.fromJson(definitionJson, JoinerDefinition.class);
+        Assert.assertEquals(joinerDefinition, parsedDefinition);
+    }
+
+    private List<StreamField> createStreamFields() {
+        List<StreamField> streamFieldList = Lists.newArrayList();
+        streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
+        streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
+        return streamFieldList;
+    }
+
+    private List<FilterRule> createFilterRule() {
+        List<FilterRule> filterRules = Lists.newArrayList();
+        filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING, "name", null, null),
+                OperationType.not_null, null, RuleRelation.OR));
+        filterRules.add(new FilterRule(new StreamField(1, FieldType.INT, "age", null, null),
+                OperationType.gt, new TargetValue(true, null, "50"), null));
+        return filterRules;
+    }
+
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
similarity index 58%
copy from inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index 21bb90d30..a9d3f9f60 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -15,23 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.cli;
+package org.apache.inlong.manager.dao.entity;
 
 import lombok.Data;
-import org.apache.inlong.manager.client.api.InlongGroupConf;
-import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
-import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSource;
 
-import java.util.List;
+import java.io.Serializable;
 
 @Data
-public class CreateGroupConf {
-
-    private InlongGroupConf groupConf;
-    private InlongStreamConf streamConf;
-    private List<StreamField> streamFieldList;
-    private StreamSource streamSource;
-    private StreamSink streamSink;
-}
+public class StreamSourceFieldEntity implements Serializable {
+    private Integer id;
+
+    private String inlongGroupId;
+
+    private String inlongStreamId;
+
+    private Integer sourceId;
+
+    private String sourceType;
+
+    private String fieldName;
+
+    private String fieldValue;
+
+    private String preExpression;
+
+    private String fieldType;
+
+    private String fieldComment;
+
+    private Short isMetaField;
+
+    private String fieldFormat;
+
+    private Short rankNum;
+
+    private Integer isDeleted;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
similarity index 55%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
index 0c18cd558..b6efc4d60 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
@@ -15,30 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.dao.entity;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
 
-import java.util.List;
-import java.util.Map;
+import java.io.Serializable;
+import java.util.Date;
 
 @Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public class StreamTransformEntity implements Serializable {
 
-    @ApiModelProperty(value = "DataSink name", required = true)
-    private String sinkName;
+    private static final long serialVersionUID = 1L;
 
-    @ApiModelProperty("Other properties if need")
-    private Map<String, Object> properties;
+    private int id;
 
-    public abstract SinkType getSinkType();
+    private String inlongGroupId;
 
-    public abstract List<SinkField> getSinkFields();
+    private String inlongStreamId;
 
-    public abstract DataFormat getDataFormat();
+    private String transformName;
 
-}
+    private String transformType;
+
+    private String preNodeNames;
+
+    private String postNodeNames;
+
+    private String transformDefinition;
+
+    private Integer version;
+
+    private Integer isDeleted;
+
+    private String creator;
+
+    private String modifier;
+
+    private Date createTime;
+
+    private Date modifyTime;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
index 95f3827d7..c9a607823 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.pojo.commonserver.CommonDbServerPageRequ
 import org.apache.inlong.manager.dao.entity.CommonDbServerEntity;
 import org.springframework.stereotype.Repository;
 
+@Deprecated
 @Repository
 public interface CommonDbServerEntityMapper {
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
index b99090bb8..6192420a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.springframework.stereotype.Repository;
 
+@Deprecated
 @Repository
 public interface CommonFileServerEntityMapper {
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
index 9c6d1d81e..c0b1661a2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
@@ -21,6 +21,7 @@ import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.dao.entity.DBCollectorDetailTaskEntity;
 import org.springframework.stereotype.Repository;
 
+@Deprecated
 @Repository
 public interface DBCollectorDetailTaskMapper {
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
similarity index 51%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index b99090bb8..340a93160 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -17,31 +17,46 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import org.apache.inlong.manager.common.pojo.commonserver.CommonFileServerPageRequest;
-import org.apache.inlong.manager.dao.entity.CommonFileServerEntity;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
+import org.springframework.stereotype.Repository;
 
 import java.util.List;
 
-import org.springframework.stereotype.Repository;
-
 @Repository
-public interface CommonFileServerEntityMapper {
+public interface StreamSourceFieldEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
-    int insert(CommonFileServerEntity record);
+    int insert(StreamSourceFieldEntity record);
 
-    int insertSelective(CommonFileServerEntity record);
+    int insertSelective(StreamSourceFieldEntity record);
 
-    CommonFileServerEntity selectByPrimaryKey(Integer id);
+    /**
+     * Selete undeleted source field by source id.
+     *
+     * @param sourceId
+     * @return
+     */
+    List<StreamSourceFieldEntity> selectBySourceId(@Param("sourceId") Integer sourceId);
 
-    int updateByPrimaryKeySelective(CommonFileServerEntity record);
+    int updateByPrimaryKeySelective(StreamSourceFieldEntity record);
 
-    int updateByPrimaryKey(CommonFileServerEntity record);
+    int updateByPrimaryKey(StreamSourceFieldEntity record);
 
-    List<CommonFileServerEntity> selectByUsernameAndIpPort(String username, String ip, int port);
+    /**
+     * Insert all field list
+     *
+     * @param fieldList
+     */
+    void insertAll(@Param("list") List<StreamSourceFieldEntity> fieldList);
 
-    List<CommonFileServerEntity> selectAll();
+    /**
+     * Delete all field list by sourceId
+     *
+     * @param sourceId
+     * @return
+     */
+    int deleteAll(@Param("sourceId") Integer sourceId);
 
-    List<CommonFileServerEntity> selectByCondition(CommonFileServerPageRequest request);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
similarity index 60%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
index 9c6d1d81e..88edd5e1d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
@@ -18,16 +18,26 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.DBCollectorDetailTaskEntity;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
+
 @Repository
-public interface DBCollectorDetailTaskMapper {
+public interface StreamTransformEntityMapper {
+
+    int deleteById(Integer id);
+
+    int insert(StreamTransformEntity record);
+
+    int insertSelective(StreamTransformEntity record);
+
+    StreamTransformEntity selectById(Integer id);
 
-    DBCollectorDetailTaskEntity selectOneByState(int state);
+    List<StreamTransformEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+            @Param("transformName") String transformName);
 
-    DBCollectorDetailTaskEntity selectByTaskId(int taskId);
+    int updateByIdSelective(StreamTransformEntity record);
 
-    int changeState(@Param("id") int id, @Param("offset") int offset,
-                    @Param("oldState") int oldState, @Param("newState") int newState);
+    int updateById(StreamTransformEntity record);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 1aae414bb..2378d1507 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -67,6 +67,12 @@
         </javaClientGenerator>
 
         <!-- Which entities to generate -->
+        <table tableName="stream_source_field" domainObjectName="StreamSourceFieldEntity"
+                enableSelectByPrimaryKey="true" enableUpdateByPrimaryKey="true"
+                enableDeleteByPrimaryKey="true" enableInsert="true"
+                enableCountByExample="false" enableDeleteByExample="false"
+                enableSelectByExample="false" enableUpdateByExample="false">
+        </table>
         <!--<table tableName="user" domainObjectName="UserEntity"
           enableSelectByPrimaryKey="true"
           enableUpdateByPrimaryKey="true"
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
new file mode 100644
index 000000000..123384e72
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper">
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+        <result column="source_id" jdbcType="INTEGER" property="sourceId"/>
+        <result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
+        <result column="field_name" jdbcType="VARCHAR" property="fieldName"/>
+        <result column="field_value" jdbcType="VARCHAR" property="fieldValue"/>
+        <result column="pre_expression" jdbcType="VARCHAR" property="preExpression"/>
+        <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
+        <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
+        <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
+        <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id
+        , inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value,
+    pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, 
+    is_deleted
+    </sql>
+    <select id="selectBySourceId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source_field
+        where source_id = #{sourceId,jdbcType=INTEGER}
+        and is_deleted = 0
+    </select>
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from stream_source_field
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
+    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
+                                         source_id, source_type, field_name,
+                                         field_value, pre_expression, field_type,
+                                         field_comment, is_meta_field, field_format,
+                                         rank_num, is_deleted)
+        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
+                #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+                #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+    </insert>
+    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        insert into stream_source_field
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                id,
+            </if>
+            <if test="inlongGroupId != null">
+                inlong_group_id,
+            </if>
+            <if test="inlongStreamId != null">
+                inlong_stream_id,
+            </if>
+            <if test="sourceId != null">
+                source_id,
+            </if>
+            <if test="sourceType != null">
+                source_type,
+            </if>
+            <if test="fieldName != null">
+                field_name,
+            </if>
+            <if test="fieldValue != null">
+                field_value,
+            </if>
+            <if test="preExpression != null">
+                pre_expression,
+            </if>
+            <if test="fieldType != null">
+                field_type,
+            </if>
+            <if test="fieldComment != null">
+                field_comment,
+            </if>
+            <if test="isMetaField != null">
+                is_meta_field,
+            </if>
+            <if test="fieldFormat != null">
+                field_format,
+            </if>
+            <if test="rankNum != null">
+                rank_num,
+            </if>
+            <if test="isDeleted != null">
+                is_deleted,
+            </if>
+        </trim>
+        <trim prefix="values (" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                #{id,jdbcType=INTEGER},
+            </if>
+            <if test="inlongGroupId != null">
+                #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongStreamId != null">
+                #{inlongStreamId,jdbcType=VARCHAR},
+            </if>
+            <if test="sourceId != null">
+                #{sourceId,jdbcType=INTEGER},
+            </if>
+            <if test="sourceType != null">
+                #{sourceType,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldName != null">
+                #{fieldName,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldValue != null">
+                #{fieldValue,jdbcType=VARCHAR},
+            </if>
+            <if test="preExpression != null">
+                #{preExpression,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldType != null">
+                #{fieldType,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldComment != null">
+                #{fieldComment,jdbcType=VARCHAR},
+            </if>
+            <if test="isMetaField != null">
+                #{isMetaField,jdbcType=SMALLINT},
+            </if>
+            <if test="fieldFormat != null">
+                #{fieldFormat,jdbcType=VARCHAR},
+            </if>
+            <if test="rankNum != null">
+                #{rankNum,jdbcType=SMALLINT},
+            </if>
+            <if test="isDeleted != null">
+                #{isDeleted,jdbcType=INTEGER},
+            </if>
+        </trim>
+    </insert>
+    <update id="updateByPrimaryKeySelective"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        update stream_source_field
+        <set>
+            <if test="inlongGroupId != null">
+                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongStreamId != null">
+                inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+            </if>
+            <if test="sourceId != null">
+                source_id = #{sourceId,jdbcType=INTEGER},
+            </if>
+            <if test="sourceType != null">
+                source_type = #{sourceType,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldName != null">
+                field_name = #{fieldName,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldValue != null">
+                field_value = #{fieldValue,jdbcType=VARCHAR},
+            </if>
+            <if test="preExpression != null">
+                pre_expression = #{preExpression,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldType != null">
+                field_type = #{fieldType,jdbcType=VARCHAR},
+            </if>
+            <if test="fieldComment != null">
+                field_comment = #{fieldComment,jdbcType=VARCHAR},
+            </if>
+            <if test="isMetaField != null">
+                is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+            </if>
+            <if test="fieldFormat != null">
+                field_format = #{fieldFormat,jdbcType=VARCHAR},
+            </if>
+            <if test="rankNum != null">
+                rank_num = #{rankNum,jdbcType=SMALLINT},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        update stream_source_field
+        set inlong_group_id  = #{inlongGroupId,jdbcType=VARCHAR},
+            inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+            source_id        = #{sourceId,jdbcType=INTEGER},
+            source_type      = #{sourceType,jdbcType=VARCHAR},
+            field_name       = #{fieldName,jdbcType=VARCHAR},
+            field_value      = #{fieldValue,jdbcType=VARCHAR},
+            pre_expression   = #{preExpression,jdbcType=VARCHAR},
+            field_type       = #{fieldType,jdbcType=VARCHAR},
+            field_comment    = #{fieldComment,jdbcType=VARCHAR},
+            is_meta_field    = #{isMetaField,jdbcType=SMALLINT},
+            field_format     = #{fieldFormat,jdbcType=VARCHAR},
+            rank_num         = #{rankNum,jdbcType=SMALLINT},
+            is_deleted       = #{isDeleted,jdbcType=INTEGER}
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+
+    <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+        insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
+        source_id, source_type, field_name,
+        field_value, pre_expression, field_type,
+        field_comment, is_meta_field, field_format,
+        rank_num, is_deleted)
+        values
+        <foreach collection="list" index="index" item="item" separator=",">
+            (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+            #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
+            #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+            #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+            #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+        </foreach>
+    </insert>
+
+    <delete id="deleteAll">
+        delete
+        from stream_source_field
+        where source_id = #{sourceId,jdbcType=INTEGER}
+    </delete>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
new file mode 100644
index 000000000..6cddcec46
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+    <result column="transform_name" jdbcType="VARCHAR" property="transformName" />
+    <result column="transform_type" jdbcType="VARCHAR" property="transformType" />
+    <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames" />
+    <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames" />
+    <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition" />
+    <result column="version" jdbcType="INTEGER" property="version" />
+    <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+    <result column="creator" jdbcType="VARCHAR" property="creator" />
+    <result column="modifier" jdbcType="VARCHAR" property="modifier" />
+    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names, 
+    post_node_names, transform_definition, version, is_deleted, creator, modifier, create_time, 
+    modify_time
+  </sql>
+  <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+    select 
+    <include refid="Base_Column_List" />
+    from stream_transform
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    select
+    <include refid="Base_Column_List"/>
+    from stream_transform
+    <where>
+      is_deleted = 0
+      and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+      <if test="streamId != null and streamId != ''">
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+      </if>
+      <if test="transformName != null and transformName != ''">
+        and transform_name = #{transformName, jdbcType=VARCHAR}
+      </if>
+    </where>
+  </select>
+  <delete id="deleteById" parameterType="java.lang.Integer">
+    delete from stream_transform
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    insert into stream_transform (id, inlong_group_id, inlong_stream_id, 
+      transform_name, transform_type, pre_node_names, 
+      post_node_names, transform_definition, version, 
+      is_deleted, creator, modifier, 
+      create_time, modify_time)
+    values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, 
+      #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR}, 
+      #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER}, 
+      #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, 
+      #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+  </insert>
+  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    insert into stream_transform
+    <trim prefix="(" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        id,
+      </if>
+      <if test="inlongGroupId != null">
+        inlong_group_id,
+      </if>
+      <if test="inlongStreamId != null">
+        inlong_stream_id,
+      </if>
+      <if test="transformName != null">
+        transform_name,
+      </if>
+      <if test="transformType != null">
+        transform_type,
+      </if>
+      <if test="preNodeNames != null">
+        pre_node_names,
+      </if>
+      <if test="postNodeNames != null">
+        post_node_names,
+      </if>
+      <if test="transformDefinition != null">
+        transform_definition,
+      </if>
+      <if test="version != null">
+        version,
+      </if>
+      <if test="isDeleted != null">
+        is_deleted,
+      </if>
+      <if test="creator != null">
+        creator,
+      </if>
+      <if test="modifier != null">
+        modifier,
+      </if>
+      <if test="createTime != null">
+        create_time,
+      </if>
+      <if test="modifyTime != null">
+        modify_time,
+      </if>
+    </trim>
+    <trim prefix="values (" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        #{id,jdbcType=INTEGER},
+      </if>
+      <if test="inlongGroupId != null">
+        #{inlongGroupId,jdbcType=VARCHAR},
+      </if>
+      <if test="inlongStreamId != null">
+        #{inlongStreamId,jdbcType=VARCHAR},
+      </if>
+      <if test="transformName != null">
+        #{transformName,jdbcType=VARCHAR},
+      </if>
+      <if test="transformType != null">
+        #{transformType,jdbcType=VARCHAR},
+      </if>
+      <if test="preNodeNames != null">
+        #{preNodeNames,jdbcType=VARCHAR},
+      </if>
+      <if test="postNodeNames != null">
+        #{postNodeNames,jdbcType=VARCHAR},
+      </if>
+      <if test="transformDefinition != null">
+        #{transformDefinition,jdbcType=VARCHAR},
+      </if>
+      <if test="version != null">
+        #{version,jdbcType=INTEGER},
+      </if>
+      <if test="isDeleted != null">
+        #{isDeleted,jdbcType=INTEGER},
+      </if>
+      <if test="creator != null">
+        #{creator,jdbcType=VARCHAR},
+      </if>
+      <if test="modifier != null">
+        #{modifier,jdbcType=VARCHAR},
+      </if>
+      <if test="createTime != null">
+        #{createTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="modifyTime != null">
+        #{modifyTime,jdbcType=TIMESTAMP},
+      </if>
+    </trim>
+  </insert>
+  <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    update stream_transform
+    <set>
+      <if test="inlongGroupId != null">
+        inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+      </if>
+      <if test="inlongStreamId != null">
+        inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+      </if>
+      <if test="transformName != null">
+        transform_name = #{transformName,jdbcType=VARCHAR},
+      </if>
+      <if test="transformType != null">
+        transform_type = #{transformType,jdbcType=VARCHAR},
+      </if>
+      <if test="preNodeNames != null">
+        pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+      </if>
+      <if test="postNodeNames != null">
+        post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+      </if>
+      <if test="transformDefinition != null">
+        transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+      </if>
+      <if test="version != null">
+        version = #{version,jdbcType=INTEGER},
+      </if>
+      <if test="isDeleted != null">
+        is_deleted = #{isDeleted,jdbcType=INTEGER},
+      </if>
+      <if test="creator != null">
+        creator = #{creator,jdbcType=VARCHAR},
+      </if>
+      <if test="modifier != null">
+        modifier = #{modifier,jdbcType=VARCHAR},
+      </if>
+      <if test="createTime != null">
+        create_time = #{createTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="modifyTime != null">
+        modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+      </if>
+    </set>
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+    update stream_transform
+    set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+      transform_name = #{transformName,jdbcType=VARCHAR},
+      transform_type = #{transformType,jdbcType=VARCHAR},
+      pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+      post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+      transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+      version = #{version,jdbcType=INTEGER},
+      is_deleted = #{isDeleted,jdbcType=INTEGER},
+      creator = #{creator,jdbcType=VARCHAR},
+      modifier = #{modifier,jdbcType=VARCHAR},
+      create_time = #{createTime,jdbcType=TIMESTAMP},
+      modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index e446d7db7..15879aa30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -19,47 +19,26 @@ package org.apache.inlong.manager.service;
 
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.thirdparty.sort.util.FieldInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
-import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
-import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -71,12 +50,6 @@ public class CommonOperateService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CommonOperateService.class);
 
-    @Autowired
-    private ClusterBean clusterBean;
-    @Autowired
-    private InlongStreamService streamService;
-    @Autowired
-    private StreamSourceService streamSourceService;
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
@@ -95,14 +68,14 @@ public class CommonOperateService {
         Map<String, String> params;
 
         switch (key) {
-            case Constant.PULSAR_SERVICEURL: {
+            case InlongGroupSettings.PULSAR_SERVICE_URL: {
                 clusterEntity = getMQCluster(MQType.PULSAR);
                 if (clusterEntity != null) {
                     result = clusterEntity.getUrl();
                 }
                 break;
             }
-            case Constant.PULSAR_ADMINURL: {
+            case InlongGroupSettings.PULSAR_ADMIN_URL: {
                 clusterEntity = getMQCluster(MQType.PULSAR);
                 if (clusterEntity != null) {
                     params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
@@ -110,12 +83,12 @@ public class CommonOperateService {
                 }
                 break;
             }
-            case Constant.CLUSTER_TUBE_MANAGER:
-            case Constant.CLUSTER_TUBE_CLUSTER_ID:
-            case Constant.TUBE_MASTER_URL: {
+            case InlongGroupSettings.TUBE_MANAGER_URL:
+            case InlongGroupSettings.TUBE_CLUSTER_ID:
+            case InlongGroupSettings.TUBE_MASTER_URL: {
                 clusterEntity = getMQCluster(MQType.TUBE);
                 if (clusterEntity != null) {
-                    if (key.equals(Constant.TUBE_MASTER_URL)) {
+                    if (key.equals(InlongGroupSettings.TUBE_MASTER_URL)) {
                         result = clusterEntity.getUrl();
                     } else {
                         params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
@@ -136,7 +109,8 @@ public class CommonOperateService {
      * TODO Add data_proxy_cluster_name for query.
      */
     private ThirdPartyClusterEntity getMQCluster(MQType type) {
-        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(
+                InlongGroupSettings.CLUSTER_DATA_PROXY);
         if (CollectionUtils.isEmpty(clusterList)) {
             LOGGER.warn("no data proxy cluster found");
             return null;
@@ -166,7 +140,7 @@ public class CommonOperateService {
         Map<String, String> configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
         PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
                 clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
-        String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
+        String adminUrl = configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
         Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
         pulsarClusterInfo.setAdminUrl(adminUrl);
         pulsarClusterInfo.setType(clusterEntity.getType());
@@ -196,58 +170,4 @@ public class CommonOperateService {
 
         return inlongGroupEntity;
     }
-
-    /**
-     * Create dataflow info for sort.
-     */
-    public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
-        String groupId = sinkResponse.getInlongGroupId();
-        String streamId = sinkResponse.getInlongStreamId();
-        List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
-        if (CollectionUtils.isEmpty(sourceList)) {
-            throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
-                    groupId, streamId));
-        }
-
-        // Get all field info
-        List<FieldInfo> sourceFields = new ArrayList<>();
-        List<FieldInfo> sinkFields = new ArrayList<>();
-
-        // TODO Support more than one source and one sink
-        final SourceResponse sourceResponse = sourceList.get(0);
-        boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
-
-        List<FieldMappingUnit> mappingUnitList;
-        InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
-        if (isAllMigration) {
-            mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
-        } else {
-            mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
-                    sinkResponse.getFieldList(), sourceFields, sinkFields);
-        }
-
-        FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
-
-        // Get source info
-        String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
-        PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
-        SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
-                groupInfo, streamInfo, sourceResponse, sourceFields);
-
-        // Get sink info
-        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse, sinkFields);
-
-        // Get transformation info
-        TransformationInfo transInfo = new TransformationInfo(fieldMappingRule);
-
-        // Get properties
-        Map<String, Object> properties = new HashMap<>();
-        if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
-            properties.putAll(sinkResponse.getProperties());
-        }
-        properties.put(Constant.DATA_FLOW_GROUP_ID_KEY, groupId);
-
-        return new DataFlowInfo(sinkResponse.getId(), sourceInfo, transInfo, sinkInfo, properties);
-    }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
index 51579cb5c..80c00d900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
@@ -36,6 +36,7 @@ import java.util.Objects;
 
 @Service
 @Slf4j
+@Deprecated
 public class DBCollectorTaskServiceImpl implements DBCollectorTaskService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DBCollectorTaskServiceImpl.class);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index eefec8892..e4d248861 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -42,6 +42,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -174,7 +175,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     public InlongGroupInfo get(String groupId) {
         LOGGER.debug("begin to get inlong group info by groupId={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             LOGGER.error("inlong group not found by groupId={}", groupId);
@@ -200,7 +201,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         // For approved inlong group, encapsulate the cluster address of the middleware
         if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
             if (mqType == MQType.TUBE) {
-                groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
+                groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL));
             } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType.name());
                 groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
@@ -253,7 +254,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         LOGGER.debug("begin to update inlong group={}", groupRequest);
         Preconditions.checkNotNull(groupRequest, "inlong group is empty");
         String groupId = groupRequest.getInlongGroupId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
@@ -328,7 +329,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             propagation = Propagation.REQUIRES_NEW)
     public boolean updateStatus(String groupId, Integer status, String operator) {
         LOGGER.info("begin to update group status to [{}] by groupId={}, username={}", status, groupId, operator);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         InlongGroupEntity entity = groupMapper.selectByGroupIdForUpdate(groupId);
         if (entity == null) {
             LOGGER.error("inlong group not found by groupId={}", groupId);
@@ -352,7 +353,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     public boolean delete(String groupId, String operator) {
         LOGGER.debug("begin to delete inlong group, groupId={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
@@ -401,7 +402,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     public boolean exist(String groupId) {
         LOGGER.debug("begin to check inlong group, groupId={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         Integer count = groupMapper.selectIdentifierExist(groupId);
         LOGGER.info("success to check inlong group");
@@ -441,12 +442,12 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         if (mqType == MQType.TUBE) {
             // Tube Topic corresponds to inlong group one-to-one
             topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
-            topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
+            topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL));
         } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             // Pulsar's topic corresponds to the inlong stream one-to-one
             topicVO.setDsTopicList(streamService.getTopicList(groupId));
-            topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
-            topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
+            topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.PULSAR_ADMIN_URL));
+            topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.PULSAR_SERVICE_URL));
         } else {
             LOGGER.error("middleware type={} not supported", mqType);
             throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED);
@@ -465,7 +466,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         // Save the dataSchema, Topic and other information of the inlong group
         Preconditions.checkNotNull(approveInfo, "InlongGroupApproveRequest is empty");
         String groupId = approveInfo.getInlongGroupId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         String middlewareType = approveInfo.getMiddlewareType();
         Preconditions.checkNotNull(middlewareType, "Middleware type is empty");
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index d0a31d5b1..b13d63f7e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -35,10 +34,10 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -91,8 +90,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(request, "inlong stream info is empty");
         String groupId = request.getInlongGroupId();
         String streamId = request.getInlongStreamId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be added
         checkBizIsTempStatus(groupId);
@@ -122,8 +121,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public InlongStreamInfo get(String groupId, String streamId) {
         LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         if (streamEntity == null) {
@@ -141,7 +140,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Override
     public Boolean exist(String groupId, String streamId) {
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         return streamEntity != null;
     }
@@ -199,9 +198,9 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         LOGGER.debug("begin to update inlong stream info={}", request);
         Preconditions.checkNotNull(request, "inlong stream request is empty");
         String groupId = request.getInlongGroupId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         String streamId = request.getInlongStreamId();
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be modified
         InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
@@ -232,8 +231,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public Boolean delete(String groupId, String streamId, String operator) {
         LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         this.checkBizIsTempStatus(groupId);
@@ -274,7 +273,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public Boolean logicDeleteAll(String groupId, String operator) {
         LOGGER.debug("begin to delete all inlong stream by groupId={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         this.checkBizIsTempStatus(groupId);
@@ -304,7 +303,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public List<StreamBriefResponse> getBriefList(String groupId) {
         LOGGER.debug("begin to get inlong stream brief list by groupId={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         List<InlongStreamEntity> entityList = streamMapper.selectByGroupId(groupId);
         List<StreamBriefResponse> briefInfoList = CommonBeanUtils
@@ -400,7 +399,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             LOGGER.debug("begin to list full inlong stream page by {}", request);
         }
         Preconditions.checkNotNull(request, "request is empty");
-        Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         // 1. Query all valid data sources under groupId
         String groupId = request.getInlongGroupId();
@@ -459,7 +458,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public List<InlongStreamTopicResponse> getTopicList(String groupId) {
         LOGGER.debug("begin bo get topic list by group id={}", groupId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         List<InlongStreamTopicResponse> topicList = streamMapper.selectTopicList(groupId);
         LOGGER.debug("success to get topic list by groupId={}", groupId);
@@ -565,7 +564,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         for (InlongStreamFieldEntity entity : list) {
             entity.setInlongGroupId(groupId);
             entity.setInlongStreamId(streamId);
-            entity.setIsDeleted(Constant.UN_DELETED);
         }
         streamFieldMapper.insertAll(list);
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
index 5b6726b41..2a8b899fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
@@ -20,11 +20,7 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Date;
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
@@ -38,6 +34,11 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+
 @Service
 public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogEntity>
         implements StreamConfigLogService {
@@ -63,7 +64,7 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to list source page by " + request);
         }
-        Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
 
         if (request.getReportTime() == null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index b3fe99eb8..9ebcce253 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.InLongStringUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -95,7 +96,6 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         }
         Preconditions.checkNotNull(entity.getCreator(), "cluster creator is empty");
         entity.setCreateTime(new Date());
-        entity.setIsDeleted(Constant.UN_DELETED);
         thirdPartyClusterMapper.insert(entity);
         LOGGER.info("success to add a cluster");
         return entity.getId();
@@ -184,9 +184,10 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         if (StringUtils.isNotBlank(clusterName)) {
             entity = thirdPartyClusterMapper.selectByName(clusterName);
         } else {
-            List<ThirdPartyClusterEntity> list = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+            List<ThirdPartyClusterEntity> list = thirdPartyClusterMapper.selectByType(
+                    InlongGroupSettings.CLUSTER_DATA_PROXY);
             if (CollectionUtils.isEmpty(list)) {
-                LOGGER.warn("data proxy cluster not found by type=" + Constant.CLUSTER_DATA_PROXY);
+                LOGGER.warn("data proxy cluster not found by type=" + InlongGroupSettings.CLUSTER_DATA_PROXY);
                 return null;
             }
             entity = list.get(0);
@@ -196,7 +197,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
             LOGGER.warn("data proxy cluster not found by name={}", clusterName);
             return null;
         }
-        if (!Constant.CLUSTER_DATA_PROXY.equals(entity.getType())) {
+        if (!InlongGroupSettings.CLUSTER_DATA_PROXY.equals(entity.getType())) {
             LOGGER.warn("expected cluster type is DATA_PROXY, but found {}", entity.getType());
             return null;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f49f12508..e53b94246 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,29 +25,21 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.CommonOperateService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.stream.CreateStreamWorkflowDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -89,8 +81,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
-    @Autowired
-    private WorkflowService workflowService;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -114,9 +104,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
         // If the inlong group status is [Configuration Successful], then asynchronously initiate
         // the [Single inlong stream Resource Creation] workflow
-        if (GroupState.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
-            executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
-        }
+//        if (GroupState.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+//            executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
+//        }
 
         LOGGER.info("success to save sink info: {}", request);
         return id;
@@ -139,7 +129,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public List<SinkResponse> listSink(String groupId, String streamId) {
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
@@ -153,8 +143,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Query all sink information and encapsulate it in the result set
         List<SinkBriefResponse> summaryList = sinkMapper.selectSummary(groupId, streamId);
@@ -165,7 +155,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
-        Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSinkEntity> entityPage = sinkMapper.selectByCondition(request);
         Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -192,7 +182,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     public Boolean update(SinkRequest request, String operator) {
         LOGGER.info("begin to update sink info: {}", request);
         this.checkParams(request);
-        Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
@@ -206,9 +196,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
         // The inlong group status is [Configuration successful], then asynchronously initiate
         // the [Single inlong stream resource creation] workflow
-        if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
-            executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
-        }
+//        if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+//            executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
+//        }
         LOGGER.info("success to update sink info: {}", request);
         return true;
     }
@@ -228,7 +218,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     public Boolean delete(Integer id, String sinkType, String operator) {
         LOGGER.info("begin to delete sink by id={}, sinkType={}", id, sinkType);
-        Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
+        Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         // Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
@@ -251,8 +241,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Transactional(rollbackFor = Throwable.class)
     public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
@@ -281,8 +271,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Transactional(rollbackFor = Throwable.class)
     public Boolean deleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to delete all sink by groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
@@ -333,7 +323,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         for (SinkApproveDTO dto : approveList) {
             // According to the sink type, save sink information
             String sinkType = dto.getSinkType();
-            Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
+            Preconditions.checkNotNull(sinkType, ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
 
             StreamSinkEntity entity = new StreamSinkEntity();
             entity.setId(dto.getId());
@@ -351,13 +341,15 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     }
 
     private void checkParams(SinkRequest request) {
-        Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
         String groupId = request.getInlongGroupId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         String streamId = request.getInlongStreamId();
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
         String sinkType = request.getSinkType();
-        Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
+        Preconditions.checkNotNull(sinkType, ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+        String sinkName = request.getSinkName();
+        Preconditions.checkNotNull(sinkName, ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage());
     }
 
     /**
@@ -365,39 +357,39 @@ public class StreamSinkServiceImpl implements StreamSinkService {
      *
      * @see CreateStreamWorkflowDefinition
      */
-    class WorkflowStartRunnable implements Runnable {
-
-        private final String operator;
-        private final InlongGroupEntity inlongGroupEntity;
-        private final String streamId;
-
-        public WorkflowStartRunnable(String operator, InlongGroupEntity inlongGroupEntity, String streamId) {
-            this.operator = operator;
-            this.inlongGroupEntity = inlongGroupEntity;
-            this.streamId = streamId;
-        }
-
-        @Override
-        public void run() {
-            String groupId = inlongGroupEntity.getInlongGroupId();
-            LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
-
-            InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
-            GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
-
-            workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
-            LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
-        }
-
-        /**
-         * Generate [Group Resource] form
-         */
-        private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo, String streamId) {
-            GroupResourceProcessForm form = new GroupResourceProcessForm();
-            form.setGroupInfo(groupInfo);
-            form.setInlongStreamId(streamId);
-            return form;
-        }
-    }
+//    class WorkflowStartRunnable implements Runnable {
+//
+//        private final String operator;
+//        private final InlongGroupEntity inlongGroupEntity;
+//        private final String streamId;
+//
+//        public WorkflowStartRunnable(String operator, InlongGroupEntity inlongGroupEntity, String streamId) {
+//            this.operator = operator;
+//            this.inlongGroupEntity = inlongGroupEntity;
+//            this.streamId = streamId;
+//        }
+//
+//        @Override
+//        public void run() {
+//            String groupId = inlongGroupEntity.getInlongGroupId();
+//            LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
+//
+//            InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
+//            GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
+//
+//            workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
+//            LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
+//        }
+//
+//        /**
+//         * Generate [Group Resource] form
+//         */
+//        private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo, String streamId) {
+//            GroupResourceProcessForm form = new GroupResourceProcessForm();
+//            form.setGroupInfo(groupInfo);
+//            form.setInlongStreamId(streamId);
+//            return form;
+//        }
+//    }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index 6aff31f3c..142be98eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -142,7 +142,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, existType));
 
         SinkResponse response = this.getFromEntity(entity, ClickHouseSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,7 +162,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
 
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, existType));
 
         ClickHouseSinkDTO dto = ClickHouseSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,7 +183,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, sinkType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index dc1b3aa09..c09ee5edf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -142,7 +142,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, existType));
 
         SinkResponse response = this.getFromEntity(entity, HiveSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,7 +162,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
 
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, existType));
 
         HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,7 +183,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, sinkType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index d09fa3ea3..9fd715491 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -136,7 +136,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, existType));
 
         SinkResponse response = this.getFromEntity(entity, IcebergSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -156,7 +156,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
 
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, existType));
 
         IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,7 +177,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, sinkType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index e8a4a169c..e72251868 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -140,7 +140,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, existType));
         SinkResponse response = this.getFromEntity(entity, KafkaSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
         List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
@@ -156,7 +156,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
         }
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, existType));
 
         KafkaSinkDTO dto = KafkaSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,7 +177,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
-                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, sinkType));
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index e99c00421..5d33e2581 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -18,18 +18,20 @@
 package org.apache.inlong.manager.service.source;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SourceState;
-import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -38,6 +40,7 @@ import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -49,6 +52,8 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperation.class);
     @Autowired
     protected StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    protected StreamSourceFieldEntityMapper sourceFieldMapper;
 
     /**
      * Setting the parameters of the latest entity.
@@ -91,15 +96,16 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
         } else {
             entity.setStatus(SourceState.SOURCE_NEW.getCode());
         }
-        entity.setIsDeleted(Constant.UN_DELETED);
         entity.setCreator(operator);
         entity.setModifier(operator);
         Date now = new Date();
         entity.setCreateTime(now);
         entity.setModifyTime(now);
+        entity.setIsDeleted(0);
         // get the ext params
         setTargetEntity(request, entity);
         sourceMapper.insert(entity);
+        saveFieldOpt(entity, request.getFieldList());
         return entity.getId();
     }
 
@@ -110,8 +116,14 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
-        return this.getFromEntity(entity, this::getResponse);
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+
+        SourceResponse sourceResponse = this.getFromEntity(entity, this::getResponse);
+        List<StreamSourceFieldEntity> sourceFieldEntities = sourceFieldMapper.selectBySourceId(id);
+        List<InlongStreamFieldInfo> fieldInfos = CommonBeanUtils.copyListProperties(sourceFieldEntities,
+                InlongStreamFieldInfo::new);
+        sourceResponse.setFieldList(fieldInfos);
+        return sourceResponse;
     }
 
     @Override
@@ -129,6 +141,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
         entity.setModifier(operator);
         entity.setModifyTime(new Date());
         sourceMapper.updateByPrimaryKeySelective(entity);
+        updateFieldOpt(entity, request.getFieldList());
         LOGGER.info("success to update source of type={}", request.getSourceType());
     }
 
@@ -189,5 +202,50 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
         curEntity.setIsDeleted(id);
         curEntity.setModifyTime(new Date());
         sourceMapper.updateByPrimaryKeySelective(curEntity);
+        sourceFieldMapper.deleteAll(id);
+        LOGGER.info("success to delete source={}", request);
+    }
+
+    private void updateFieldOpt(StreamSourceEntity entity, List<InlongStreamFieldInfo> fieldInfos) {
+        Integer sourceId = entity.getId();
+        if (CollectionUtils.isEmpty(fieldInfos)) {
+            return;
+        }
+
+        // First physically delete the existing fields
+        sourceFieldMapper.deleteAll(sourceId);
+        // Then batch save the source fields
+        this.saveFieldOpt(entity, fieldInfos);
+
+        LOGGER.info("success to update field");
+    }
+
+    private void saveFieldOpt(StreamSourceEntity entity, List<InlongStreamFieldInfo> fieldInfos) {
+        LOGGER.info("begin to save field={}", fieldInfos);
+        if (CollectionUtils.isEmpty(fieldInfos)) {
+            return;
+        }
+
+        int size = fieldInfos.size();
+        List<StreamSourceFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = entity.getInlongGroupId();
+        String streamId = entity.getInlongStreamId();
+        String sourceType = entity.getSourceType();
+        Integer sourceId = entity.getId();
+        for (InlongStreamFieldInfo fieldInfo : fieldInfos) {
+            StreamSourceFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo,
+                    StreamSourceFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSourceId(sourceId);
+            fieldEntity.setSourceType(sourceType);
+            entityList.add(fieldEntity);
+        }
+
+        sourceFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save source fields");
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 76d9b9f5e..89e0db727 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SourceState;
@@ -104,7 +103,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
     @Override
     public List<SourceResponse> listSource(String groupId, String streamId) {
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
@@ -117,7 +116,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
     @Override
     public PageInfo<? extends SourceListResponse> listByCondition(SourcePageRequest request) {
-        Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSourceEntity> entityList = sourceMapper.selectByCondition(request);
 
@@ -146,7 +145,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     public boolean update(SourceRequest request, String operator) {
         LOGGER.info("begin to update source info: {}", request);
         this.checkParams(request);
-        Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
+        Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
@@ -175,7 +174,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             isolation = Isolation.READ_COMMITTED)
     public boolean delete(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
-        Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
+        Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
 
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
@@ -231,8 +230,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             isolation = Isolation.READ_COMMITTED)
     public boolean logicDeleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
@@ -266,8 +265,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             isolation = Isolation.READ_COMMITTED)
     public boolean deleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to delete all source by groupId={}, streamId={}", groupId, streamId);
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
@@ -289,13 +288,15 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     private void checkParams(SourceRequest request) {
-        Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
         String groupId = request.getInlongGroupId();
-        Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         String streamId = request.getInlongStreamId();
-        Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
         String sourceType = request.getSourceType();
-        Preconditions.checkNotNull(sourceType, SourceType.SOURCE_TYPE_IS_EMPTY);
+        Preconditions.checkNotNull(sourceType, ErrorCodeEnum.SOURCE_TYPE_IS_NULL.getMessage());
+        String sourceName = request.getSourceName();
+        Preconditions.checkNotNull(sourceName, ErrorCodeEnum.SOURCE_NAME_IS_NULL.getMessage());
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
index cea98743f..af4028388 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
@@ -79,7 +79,7 @@ public class AutoPushSourceOperation extends AbstractSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
         AutoPushSourceDTO dto = AutoPushSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
index 3f9c35923..68e06760f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
@@ -92,7 +92,7 @@ public class BinlogSourceOperation extends AbstractSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
         BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
index 46e24fcc1..79e078be1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
@@ -90,7 +90,7 @@ public class FileSourceOperation extends AbstractSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
         FileSourceDTO dto = FileSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
index 3fd771d74..3d1521ad8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
@@ -92,7 +92,7 @@ public class KafkaSourceOperation extends AbstractSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
         KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
index a77d499eb..6133d4f50 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ReTryConfigBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -64,7 +64,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
 
         InlongGroupInfo groupInfo = groupService.get(groupId);
         String topicName = groupInfo.getMqResourceObj();
-        int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID));
+        int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID));
         QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
                 .topicName(topicName).clusterId(clusterId)
                 .user(groupInfo.getCreator()).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
index 124c30a32..e800fa0a1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.thirdparty.mq;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -57,13 +57,13 @@ public class TubeMqOptService {
                 throw new Exception("topic cannot be empty");
             }
             AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
-            String clusterIdStr = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID);
+            String clusterIdStr = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID);
             int clusterId = Integer.parseInt(clusterIdStr);
             QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
                     .topicName(addTopicTasksBean.getTopicName()).clusterId(clusterId)
                     .user(request.getUser()).build();
 
-            String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
             TubeManagerResponse response = httpUtils
                     .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
                             GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
@@ -91,7 +91,7 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.add("Content-Type", "application/json");
         try {
-            String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
             log.info("create tube consumer group {}  on {} ", GSON.toJson(request),
                     tubeManager + "/v1/task?method=addTopicTask");
             TubeManagerResponse rsp = httpUtils.request(tubeManager + "/v1/group?method=add",
@@ -113,7 +113,7 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.add("Content-Type", "application/json");
         try {
-            String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
             TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
                     HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
             if (response.getErrCode() == 0) { // topic already exists
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 9b7afd472..5f03986b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -31,8 +31,8 @@ import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm.OperateType;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.util.DataFlowUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -56,10 +56,10 @@ public class CreateSortConfigListener implements SortOperateListener {
     private static final Logger LOGGER = LoggerFactory.getLogger(CreateSortConfigListener.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
 
-    @Autowired
-    private CommonOperateService commonOperateService;
     @Autowired
     private StreamSinkService streamSinkService;
+    @Autowired
+    private DataFlowUtils dataFlowUtils;
 
     @Override
     public TaskEvent event() {
@@ -93,7 +93,7 @@ public class CreateSortConfigListener implements SortOperateListener {
         try {
             // TODO Support more than one sinks under a stream
             Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
-                        DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
+                        DataFlowInfo flowInfo = dataFlowUtils.createDataFlow(groupInfo, sink);
                         return Pair.of(sink.getInlongStreamId(), flowInfo);
                     }
             ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 16ccb9d05..334427afc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -27,10 +27,10 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.util.DataFlowUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -55,8 +55,6 @@ public class PushSortConfigListener implements SortOperateListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
 
-    @Autowired
-    private CommonOperateService commonOperateService;
     @Autowired
     private ClusterBean clusterBean;
     @Autowired
@@ -65,6 +63,8 @@ public class PushSortConfigListener implements SortOperateListener {
     private InlongStreamService streamService;
     @Autowired
     private StreamSinkService streamSinkService;
+    @Autowired
+    private DataFlowUtils dataFlowUtils;
 
     @Override
     public TaskEvent event() {
@@ -94,7 +94,7 @@ public class PushSortConfigListener implements SortOperateListener {
                 LOGGER.debug("sink info: {}", sinkResponse);
             }
 
-            DataFlowInfo dataFlowInfo = commonOperateService.createDataFlow(groupInfo, sinkResponse);
+            DataFlowInfo dataFlowInfo = dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("try to push config to sort: {}", JsonUtils.toJson(dataFlowInfo));
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java
new file mode 100644
index 000000000..9108afd43
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdparty.sort.util;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
+import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class DataFlowUtils {
+
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private CommonOperateService commonOperateService;
+    @Autowired
+    private StreamSourceService streamSourceService;
+    @Autowired
+    private InlongStreamService streamService;
+
+    /**
+     * Create dataflow info for sort.
+     */
+    public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
+        String groupId = sinkResponse.getInlongGroupId();
+        String streamId = sinkResponse.getInlongStreamId();
+        List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
+        if (CollectionUtils.isEmpty(sourceList)) {
+            throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+                    groupId, streamId));
+        }
+
+        // Get all field info
+        List<FieldInfo> sourceFields = new ArrayList<>();
+        List<FieldInfo> sinkFields = new ArrayList<>();
+
+        // TODO Support more than one source and one sink
+        final SourceResponse sourceResponse = sourceList.get(0);
+        boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
+
+        List<FieldMappingUnit> mappingUnitList;
+        InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+        if (isAllMigration) {
+            mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
+        } else {
+            mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
+                    sinkResponse.getFieldList(), sourceFields, sinkFields);
+        }
+
+        FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
+
+        // Get source info
+        String masterAddress = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
+        SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
+                groupInfo, streamInfo, sourceResponse, sourceFields);
+
+        // Get sink info
+        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse, sinkFields);
+
+        // Get transformation info
+        TransformationInfo transInfo = new TransformationInfo(fieldMappingRule);
+
+        // Get properties
+        Map<String, Object> properties = new HashMap<>();
+        if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
+            properties.putAll(sinkResponse.getProperties());
+        }
+        properties.put(InlongGroupSettings.DATA_FLOW_GROUP_ID_KEY, groupId);
+
+        return new DataFlowInfo(sinkResponse.getId(), sourceInfo, transInfo, sinkInfo, properties);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
new file mode 100644
index 000000000..245bef130
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+
+import java.util.List;
+
+/**
+ * Service layer interface for stream transform
+ */
+public interface StreamTransformService {
+
+    /**
+     * Save the transform information.
+     *
+     * @param transformRequest
+     * @param operator
+     * @return transform id after saving
+     */
+    Integer save(TransformRequest transformRequest, String operator);
+
+    /**
+     * Query transform information based on inlong group id and inlong stream id.
+     *
+     * @param groupId
+     * @param streamId
+     * @return TransformResponse
+     */
+    List<TransformResponse> listTransform(String groupId, String streamId);
+
+    /**
+     * Modify data transform information.
+     *
+     * @param transformRequest
+     * @param operator
+     * @return Whether succeed
+     */
+    boolean update(TransformRequest transformRequest, String operator);
+
+    /**
+     * Delete the stream transform by the given id.
+     *
+     * @param id
+     * @param operator
+     * @return Whether succeed
+     */
+    boolean delete(String groupId, String streamId, String transformName, String operator);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
new file mode 100644
index 000000000..8c1082e6a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of transform service interface
+ */
+@Service
+@Slf4j
+public class StreamTransformServiceImpl implements StreamTransformService {
+
+    @Autowired
+    protected StreamTransformEntityMapper transformEntityMapper;
+    @Autowired
+    protected CommonOperateService commonOperateService;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+    public Integer save(TransformRequest transformRequest, String operator) {
+        log.info("begin to save transform info: {}", transformRequest);
+        this.checkParams(transformRequest);
+
+        // Check if can be added
+        final String groupId = transformRequest.getInlongGroupId();
+        final String streamId = transformRequest.getInlongStreamId();
+        final String transformName = transformRequest.getTransformName();
+        commonOperateService.checkGroupStatus(groupId, operator);
+
+        List<StreamTransformEntity> transformEntities = transformEntityMapper.selectByRelatedId(groupId,
+                streamId, transformName);
+        if (CollectionUtils.isNotEmpty(transformEntities)) {
+            String err = "stream transform already exists with groupId=%s, streamId=%s, transformName=%s";
+            throw new BusinessException(String.format(err, groupId, streamId, transformName));
+        }
+        StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+                StreamTransformEntity::new);
+        transformEntity.setCreator(operator);
+        transformEntity.setModifier(operator);
+        Date now = new Date();
+        transformEntity.setCreateTime(now);
+        transformEntity.setModifyTime(now);
+        transformEntityMapper.insertSelective(transformEntity);
+        return transformEntity.getId();
+    }
+
+    @Override
+    public List<TransformResponse> listTransform(String groupId, String streamId) {
+        log.info("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        List<StreamTransformEntity> transformEntities = transformEntityMapper.selectByRelatedId(groupId, streamId,
+                null);
+        if (CollectionUtils.isEmpty(transformEntities)) {
+            return Collections.emptyList();
+        }
+        List<TransformResponse> transformResponses = transformEntities.stream()
+                .map(transformEntity -> CommonBeanUtils.copyProperties(transformEntity,
+                        TransformResponse::new))
+                .collect(Collectors.toList());
+        return transformResponses;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+    public boolean update(TransformRequest transformRequest, String operator) {
+        log.info("begin to update transform info: {}", transformRequest);
+        this.checkParams(transformRequest);
+        // Check if can be modified
+        String groupId = transformRequest.getInlongGroupId();
+        commonOperateService.checkGroupStatus(groupId, operator);
+        Preconditions.checkNotNull(transformRequest.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+        StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+                StreamTransformEntity::new);
+        transformEntity.setModifier(operator);
+        transformEntity.setVersion(transformEntity.getVersion() + 1);
+        Date now = new Date();
+        transformEntity.setModifyTime(now);
+        return transformEntityMapper.updateByIdSelective(transformEntity) == transformEntity.getId();
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+    public boolean delete(String groupId, String streamId, String transformName, String operator) {
+        log.info("begin to delete source by groupId={} streamId={}, transformName={}", groupId, streamId,
+                transformName);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        commonOperateService.checkGroupStatus(groupId, operator);
+        Date now = new Date();
+        List<StreamTransformEntity> entityList = transformEntityMapper.selectByRelatedId(groupId, streamId,
+                transformName);
+        if (CollectionUtils.isNotEmpty(entityList)) {
+            for (StreamTransformEntity entity : entityList) {
+                Integer id = entity.getId();
+                entity.setVersion(entity.getVersion() + 1);
+                entity.setIsDeleted(id);
+                entity.setModifier(operator);
+                entity.setModifyTime(now);
+                transformEntityMapper.updateByIdSelective(entity);
+            }
+        }
+        log.info("success to logic delete transform by groupId={}, streamId={}, transformName={}", groupId, streamId,
+                transformName);
+        return true;
+    }
+
+    private void checkParams(TransformRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String groupId = request.getInlongGroupId();
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        String streamId = request.getInlongStreamId();
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        String transformType = request.getTransformType();
+        Preconditions.checkNotNull(transformType, ErrorCodeEnum.TRANSFORM_TYPE_IS_NULL.getMessage());
+        String transformName = request.getTransformName();
+        Preconditions.checkNotNull(transformName, ErrorCodeEnum.TRANSFORM_NAME_IS_NULL.getMessage());
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 5022d48d4..2a656d8fe 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -25,8 +25,8 @@ import org.springframework.boot.test.context.SpringBootTest;
 @SpringBootTest(classes = ServiceBaseTest.class)
 public class ServiceBaseTest extends BaseTest {
 
-    public final String globalGroupId = "b_group1";
-    public final String globalStreamId = "stream1";
-    public final String globalOperator = "admin";
+    public static final String GLOBAL_GROUP_ID = "b_group1";
+    public static final String GLOBAL_STREAM_ID = "stream1";
+    public static final String GLOBAL_OPERATOR = "admin";
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e387afef2..48946c2af 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -44,14 +44,14 @@ public class AgentServiceTest extends ServiceBaseTest {
     private InlongStreamServiceTest streamServiceTest;
 
     public Integer saveSource() {
-        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, globalOperator);
+        streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
 
         BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
-        sourceInfo.setInlongGroupId(globalGroupId);
-        sourceInfo.setInlongStreamId(globalStreamId);
+        sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+        sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
         sourceInfo.setSourceType(SourceType.BINLOG.getType());
 
-        return sourceService.save(sourceInfo, globalOperator);
+        return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }
 
     @Test
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
         Boolean result = agentService.reportSnapshot(request);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+        sourceService.delete(id, SourceType.BINLOG.getType(), GLOBAL_OPERATOR);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
index c18304795..64394c678 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
 import org.junit.Assert;
@@ -46,17 +46,17 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         request.setType(type);
         request.setIp(ip);
         request.setPort(port);
-        request.setInCharges(globalOperator);
-        return clusterService.save(request, globalOperator);
+        request.setInCharges(GLOBAL_OPERATOR);
+        return clusterService.save(request, GLOBAL_OPERATOR);
     }
 
     public Boolean deleteOpt(Integer id) {
-        return clusterService.delete(id, globalOperator);
+        return clusterService.delete(id, GLOBAL_OPERATOR);
     }
 
     @Test
     public void testSaveAndDelete() {
-        Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, CLUSTER_IP, CLUSTER_PORT);
+        Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, CLUSTER_IP, CLUSTER_PORT);
         Assert.assertNotNull(id);
 
         Boolean success = this.deleteOpt(id);
@@ -69,7 +69,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Integer p1 = 46800;
         Integer p2 = 46801;
         String url = "127.0.0.1:" + p1 + ",127.0.0.2";
-        Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, url, p2);
+        Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, url, p2);
         Assert.assertNotNull(id);
 
         // Get the data proxy cluster ip list, the first port should is p1, second port is p2
@@ -86,7 +86,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         // Simulate saving and parsing dirty url without port, default port is p1
         Integer p1 = 46801;
         String url = ":,,, :127.0 .0.1:,: ,,,";
-        Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, url, p1);
+        Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, url, p1);
         List<DataProxyResponse> ipList = clusterService.getIpList(CLUSTER_NAME);
         // The result port is p1
         Assert.assertEquals(p1, ipList.get(0).getPort());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java
new file mode 100644
index 000000000..a03fbf0e6
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import springfox.boot.starter.autoconfigure.OpenApiAutoConfiguration;
+
+import java.util.Date;
+import java.util.List;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@EnableAutoConfiguration(exclude = OpenApiAutoConfiguration.class)
+public class StreamTransformServiceTest extends ServiceBaseTest {
+
+    public static final String TRANSFORM_NAME = "test_transform";
+
+    @Autowired
+    protected StreamTransformService streamTransformService;
+    @Autowired
+    protected StreamTransformEntityMapper transformEntityMapper;
+
+    @Test
+    public void testSaveStreamTransform() {
+        TransformRequest transformRequest = new TransformRequest();
+        transformRequest.setTransformName(TRANSFORM_NAME);
+        transformRequest.setTransformType(TransformType.FILTER.getType());
+        transformRequest.setTransformDefinition("{}");
+        transformRequest.setInlongStreamId(GLOBAL_STREAM_ID);
+        transformRequest.setInlongGroupId(GLOBAL_GROUP_ID);
+        StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+                StreamTransformEntity::new);
+        transformEntity.setCreator(GLOBAL_OPERATOR);
+        transformEntity.setModifier(GLOBAL_OPERATOR);
+        Date now = new Date();
+        transformEntity.setCreateTime(now);
+        transformEntity.setModifyTime(now);
+        int index = transformEntityMapper.insertSelective(transformEntity);
+        Assert.assertTrue(index == 1);
+
+        List<TransformResponse> transformResponses = streamTransformService.listTransform(GLOBAL_GROUP_ID,
+                GLOBAL_STREAM_ID);
+        Assert.assertTrue(transformResponses.size() == 1);
+
+    }
+
+}
diff --git a/inlong-manager/manager-test/src/main/resources/application-test.properties b/inlong-manager/manager-test/src/main/resources/application-test.properties
index 26a274dd7..471175492 100644
--- a/inlong-manager/manager-test/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-test/src/main/resources/application-test.properties
@@ -100,3 +100,4 @@ common.http-client.validateAfterInactivity=5000
 common.http-client.connectionTimeout=3000
 common.http-client.readTimeout=10000
 common.http-client.connectionRequestTimeout=3000
+spring.main.allow-circular-references=true
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 4322e502f..a2b05e03c 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -477,6 +477,29 @@ CREATE TABLE `stream_source`
     KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
 );
 
+-- ----------------------------
+-- Table structure for stream_transform
+-- ----------------------------
+CREATE TABLE `stream_transform`
+(
+    `id`                   int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`      varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`     varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `transform_name`       varchar(128) NOT NULL COMMENT 'Transform name, unique in one stream',
+    `transform_type`       varchar(20)  NOT NULL COMMENT 'Transform type, including: splitter, filter, joiner, etc.',
+    `pre_node_names`       text         NOT NULL COMMENT 'Pre node names of transform in this stream',
+    `post_node_names`      text COMMENT 'Post node names of transform in this stream',
+    `transform_definition` text         NOT NULL COMMENT 'Transform definition in json type',
+    `version`              int(11)      NOT NULL DEFAULT '1' COMMENT 'Stream transform version',
+    `is_deleted`           int(11)      NOT NULL DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`              varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`             varchar(64)           DEFAULT '' COMMENT 'Modifier name',
+    `create_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+);
+
 -- ----------------------------
 -- Table structure for stream_sink
 -- ----------------------------
@@ -520,6 +543,31 @@ CREATE TABLE `stream_sink_ext`
     KEY `index_sink_id` (`sink_id`)
 );
 
+-- ----------------------------
+-- Table structure for stream_source_field
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_source_field`;
+CREATE TABLE `stream_source_field`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `source_id`        int(11)      NOT NULL COMMENT 'Sink id',
+    `source_type`      varchar(15)  NOT NULL COMMENT 'Sink type',
+    `field_name`       varchar(20)  NOT NULL COMMENT 'field name',
+    `field_value`      varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
+    `pre_expression`   varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
+    `field_type`       varchar(20)  NOT NULL COMMENT 'field type',
+    `field_comment`    varchar(50)  DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`    smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`     varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+    `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    PRIMARY KEY (`id`),
+    KEY `index_source_id` (`source_id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+
 -- ----------------------------
 -- Table structure for stream_sink_field
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0a2f70b1d..8c55cc281 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -502,6 +502,30 @@ CREATE TABLE `stream_source`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
+-- ----------------------------
+-- Table structure for stream_transform
+-- ----------------------------
+CREATE TABLE `stream_transform`
+(
+    `id`                   int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`      varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`     varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `transform_name`       varchar(128) NOT NULL COMMENT 'Transform name, unique in one stream',
+    `transform_type`       varchar(20)  NOT NULL COMMENT 'Transform type, including: splitter, filter, joiner, etc.',
+    `pre_node_names`       text         NOT NULL COMMENT 'Pre node names of transform in this stream',
+    `post_node_names`      text COMMENT 'Post node names of transform in this stream',
+    `transform_definition` text         NOT NULL COMMENT 'Transform definition in json type',
+    `version`              int(11)      NOT NULL DEFAULT '1' COMMENT 'Stream transform version',
+    `is_deleted`           int(11)      NOT NULL DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`              varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`             varchar(64)           DEFAULT '' COMMENT 'Modifier name',
+    `create_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform table';
+
 -- ----------------------------
 -- Table structure for stream_sink
 -- ----------------------------
@@ -547,6 +571,31 @@ CREATE TABLE `stream_sink_ext`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink extension table';
 
+-- ----------------------------
+-- Table structure for stream_source_field
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_source_field`;
+CREATE TABLE `stream_source_field`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `source_id`        int(11)      NOT NULL COMMENT 'Sink id',
+    `source_type`      varchar(15)  NOT NULL COMMENT 'Sink type',
+    `field_name`       varchar(20)  NOT NULL COMMENT 'field name',
+    `field_value`      varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
+    `pre_expression`   varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
+    `field_type`       varchar(20)  NOT NULL COMMENT 'field type',
+    `field_comment`    varchar(50)  DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`    smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`     varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+    `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    PRIMARY KEY (`id`),
+    KEY `index_source_id` (`source_id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+
 -- ----------------------------
 -- Table structure for stream_sink_field
 -- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
new file mode 100644
index 000000000..c0e201960
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.LoginUserUtils;
+import org.apache.inlong.manager.service.core.operationlog.OperationLog;
+import org.apache.inlong.manager.service.transform.StreamTransformService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * Stream transform control layer
+ */
+@RestController
+@RequestMapping("/transform")
+@Api(tags = "Stream transform config")
+@Slf4j
+public class StreamTransformController {
+
+    @Autowired
+    protected StreamTransformService streamTransformService;
+
+    @RequestMapping(value = "/save", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE)
+    @ApiOperation(value = "Save stream transform")
+    public Response<Integer> save(@Validated @RequestBody TransformRequest request) {
+        return Response.success(
+                streamTransformService.save(request, LoginUserUtils.getLoginUserDetail().getUserName()));
+    }
+
+    @RequestMapping(value = "/list", method = RequestMethod.GET)
+    @ApiOperation(value = "Query stream transform list")
+    public Response<List<TransformResponse>> list(@RequestParam("inlongGroupId") String groupId,
+            @RequestParam("inlongStreamId") String streamId) {
+        return Response.success(streamTransformService.listTransform(groupId, streamId));
+    }
+
+    @RequestMapping(value = "/update", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Modify stream source")
+    public Response<Boolean> update(@Validated @RequestBody TransformRequest request) {
+        return Response.success(
+                streamTransformService.update(request, LoginUserUtils.getLoginUserDetail().getUserName()));
+    }
+
+    @RequestMapping(value = "/delete", method = RequestMethod.DELETE)
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Modify stream source")
+    public Response<Boolean> delete(@RequestParam("inlongGroupId") String groupId,
+            @RequestParam("inlongStreamId") String streamId, @RequestParam("transformName") String transformName) {
+        return Response.success(
+                streamTransformService.delete(groupId, streamId, transformName,
+                        LoginUserUtils.getLoginUserDetail().getUserName()));
+    }
+}