You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/05 14:04:10 UTC

[incubator-inlong] branch master updated: [INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 011d12bb0 [INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)
011d12bb0 is described below

commit 011d12bb07d0f652ecf3f5e103f27c1129324caa
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu May 5 22:04:05 2022 +0800

    [INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)
---
 .../inlong/manager/client/api/InlongStream.java    |   6 +-
 .../api/impl/DefaultInlongStreamBuilder.java       | 162 ++++++++++----------
 .../manager/client/api/impl/InlongGroupImpl.java   |   4 +-
 .../manager/client/api/impl/InlongStreamImpl.java  | 165 +++++++++++++++++++--
 .../client/api/inner/InnerInlongManagerClient.java |  56 +++++++
 .../client/api/impl/InlongStreamImplTest.java      |   4 +-
 .../manager/service/sort/util/FieldInfoUtils.java  |  18 ++-
 .../manager/service/sort/util/LoadNodeUtils.java   |  10 +-
 8 files changed, 311 insertions(+), 114 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 169b7befa..26a3a2c6f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -46,9 +46,5 @@ public abstract class InlongStream {
 
     public abstract StreamPipeline createPipeline();
 
-    @Deprecated
-    public abstract void updateSource(StreamSource source);
-
-    @Deprecated
-    public abstract void updateSink(StreamSink sink);
+    public abstract InlongStream update();
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 60f20df6f..c94d39ee6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -25,9 +25,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
@@ -36,8 +33,6 @@ import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
-import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@@ -46,6 +41,9 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
 import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 
@@ -72,7 +70,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         InnerStreamContext streamContext = new InnerStreamContext(stream);
         groupContext.setStreamContext(streamContext);
         this.streamContext = streamContext;
-        this.inlongStream = new InlongStreamImpl(stream.getName());
+        this.inlongStream = new InlongStreamImpl(groupContext.getGroupInfo().getName(), stream.getName(),
+                managerClient);
         if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
             this.inlongStream.setStreamFields(streamConf.getStreamFields());
         }
@@ -150,13 +149,12 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
         if (existMsg.getKey()) {
             Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
-            if (updateMsg.getKey()) {
-                initOrUpdateTransform();
-                initOrUpdateSource();
-                initOrUpdateSink();
-            } else {
+            if (!updateMsg.getKey()) {
                 throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
             }
+            initOrUpdateTransform();
+            initOrUpdateSource();
+            initOrUpdateSink();
             return inlongStream;
         } else {
             return init();
@@ -169,6 +167,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+        List<String> updateTransformNames = Lists.newArrayList();
         for (TransformResponse transformResponse : transformResponses) {
             StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
             String transformName = transform.getTransformName();
@@ -186,93 +185,94 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
                     throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
                             updateState.getValue()));
                 }
+                transformRequest.setId(transformResponse.getId());
+                updateTransformNames.add(transformName);
             }
         }
-    }
-
-    private void initOrUpdateSource() {
-        List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
-        for (SourceRequest sourceRequest : sourceRequests) {
-            sourceRequest.setId(initOrUpdateSource(sourceRequest));
+        for (Map.Entry<String, TransformRequest> requestEntry : transformRequests.entrySet()) {
+            String transformName = requestEntry.getKey();
+            if (updateTransformNames.contains(transformName)) {
+                continue;
+            }
+            TransformRequest transformRequest = requestEntry.getValue();
+            String index = managerClient.createTransform(transformRequest);
+            transformRequest.setId(Double.valueOf(index).intValue());
         }
     }
 
-    private int initOrUpdateSource(SourceRequest sourceRequest) {
-        String sourceType = sourceRequest.getSourceType();
-        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
-            List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
-                    sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
-            if (CollectionUtils.isEmpty(responses)) {
-                String sourceIndex = managerClient.createSource(sourceRequest);
-                return Double.valueOf(sourceIndex).intValue();
-            } else {
-                SourceListResponse sourceListResponse = null;
-                for (SourceListResponse response : responses) {
-                    if (response.getSourceName().equals(sourceRequest.getSourceName())) {
-                        sourceListResponse = response;
-                        break;
-                    }
+    private void initOrUpdateSource() {
+        Map<String, SourceRequest> sourceRequests = streamContext.getSourceRequests();
+        InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
+        List<String> updateSourceNames = Lists.newArrayList();
+        for (SourceListResponse sourceListResponse : sourceListResponses) {
+            String sourceName = sourceListResponse.getSourceName();
+            int id = sourceListResponse.getId();
+            String type = sourceListResponse.getSourceType();
+            if (sourceRequests.get(sourceName) == null) {
+                boolean isDelete = managerClient.deleteSource(id, type);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
                 }
-                if (sourceListResponse == null) {
-                    String sourceIndex = managerClient.createSource(sourceRequest);
-                    return Double.valueOf(sourceIndex).intValue();
+            } else {
+                SourceRequest sourceRequest = sourceRequests.get(sourceName);
+                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
+                            updateState.getValue()));
                 }
+                updateSourceNames.add(sourceName);
                 sourceRequest.setId(sourceListResponse.getId());
-                Pair<Boolean, String> updateMsg = managerClient.updateSource(sourceRequest);
-                if (updateMsg.getKey()) {
-                    return sourceListResponse.getId();
-                } else {
-                    throw new RuntimeException(
-                            String.format("Update source:%s failed with ex:%s", GsonUtil.toJson(sourceRequest),
-                                    updateMsg.getValue()));
-                }
             }
-        } else {
-            throw new IllegalArgumentException(String.format("Unsupported source type:%s", sourceType));
         }
-    }
-
-    private void initOrUpdateSink() {
-        List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
-        for (SinkRequest sinkRequest : sinkRequests) {
-            sinkRequest.setId(initOrUpdateSink(sinkRequest));
+        for (Map.Entry<String, SourceRequest> requestEntry : sourceRequests.entrySet()) {
+            String sourceName = requestEntry.getKey();
+            if (updateSourceNames.contains(sourceName)) {
+                continue;
+            }
+            SourceRequest sourceRequest = requestEntry.getValue();
+            String index = managerClient.createSource(sourceRequest);
+            sourceRequest.setId(Double.valueOf(index).intValue());
         }
     }
 
-    private int initOrUpdateSink(SinkRequest sinkRequest) {
-        String sinkType = sinkRequest.getSinkType();
-        boolean flag = SinkType.HIVE.name().equals(sinkType) || SinkType.KAFKA.name().equals(sinkType)
-                || SinkType.CLICKHOUSE.name().equals(sinkType);
-        if (flag) {
-            List<SinkListResponse> responses = managerClient.listSinks(sinkRequest.getInlongGroupId(),
-                    sinkRequest.getInlongStreamId(), sinkRequest.getSinkType());
-            if (CollectionUtils.isEmpty(responses)) {
-                String sinkIndex = managerClient.createSink(sinkRequest);
-                return Double.valueOf(sinkIndex).intValue();
-            } else {
-                SinkListResponse sinkListResponse = null;
-                for (SinkListResponse response : responses) {
-                    if (response.getSinkName().equals(sinkRequest.getSinkName())) {
-                        sinkListResponse = response;
-                        break;
-                    }
+    private void initOrUpdateSink() {
+        Map<String, SinkRequest> sinkRequests = streamContext.getSinkRequests();
+        InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        List<SinkListResponse> sinkListResponses = managerClient.listSinks(groupId, streamId);
+        List<String> updateSinkNames = Lists.newArrayList();
+        for (SinkListResponse sinkListResponse : sinkListResponses) {
+            String sinkName = sinkListResponse.getSinkName();
+            int id = sinkListResponse.getId();
+            String type = sinkListResponse.getSinkType();
+            if (sinkRequests.get(sinkName) == null) {
+                boolean isDelete = managerClient.deleteSink(id, type);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
                 }
-                if (sinkListResponse == null) {
-                    String sinkIndex = managerClient.createSink(sinkRequest);
-                    return Double.valueOf(sinkIndex).intValue();
+            } else {
+                SinkRequest sinkRequest = sinkRequests.get(sinkName);
+                Pair<Boolean, String> updateState = managerClient.updateSink(sinkRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
+                            updateState.getValue()));
                 }
+                updateSinkNames.add(sinkName);
                 sinkRequest.setId(sinkListResponse.getId());
-                Pair<Boolean, String> updateMsg = managerClient.updateSink(sinkRequest);
-                if (updateMsg.getKey()) {
-                    return sinkListResponse.getId();
-                } else {
-                    throw new RuntimeException(
-                            String.format("Update sink:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
-                                    updateMsg.getValue()));
-                }
             }
-        } else {
-            throw new IllegalArgumentException(String.format("Unsupported sink type:%s", sinkType));
+        }
+        for (Map.Entry<String, SinkRequest> requestEntry : sinkRequests.entrySet()) {
+            String sinkName = requestEntry.getKey();
+            if (updateSinkNames.contains(sinkName)) {
+                continue;
+            }
+            SinkRequest sinkRequest = requestEntry.getValue();
+            String index = managerClient.createSink(sinkRequest);
+            sinkRequest.setId(Double.valueOf(index).intValue());
         }
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8f7e1b360..825ba0689 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -271,6 +271,8 @@ public class InlongGroupImpl implements InlongGroup {
         if (CollectionUtils.isEmpty(streamResponses)) {
             return null;
         }
-        return streamResponses.stream().map(InlongStreamImpl::new).collect(Collectors.toList());
+        return streamResponses.stream()
+                .map(fullStreamResponse -> new InlongStreamImpl(fullStreamResponse, managerClient))
+                .collect(Collectors.toList());
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 1915eaa96..60515cf1f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -22,19 +22,23 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
 import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
@@ -42,6 +46,11 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
 import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 
 import java.util.List;
 import java.util.Map;
@@ -50,9 +59,12 @@ import java.util.stream.Collectors;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
-@NoArgsConstructor
 public class InlongStreamImpl extends InlongStream {
 
+    private InnerInlongManagerClient managerClient;
+
+    private String groupName;
+
     private String name;
 
     private Map<String, StreamSource> streamSources = Maps.newHashMap();
@@ -63,9 +75,11 @@ public class InlongStreamImpl extends InlongStream {
 
     private List<StreamField> streamFields = Lists.newArrayList();
 
-    public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
+    public InlongStreamImpl(FullStreamResponse fullStreamResponse, InnerInlongManagerClient managerClient) {
         InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
+        this.managerClient = managerClient;
         this.name = streamInfo.getName();
+        this.groupName = streamInfo.getInlongGroupId().substring(2);
         List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
             this.streamFields = streamFieldInfos.stream()
@@ -106,7 +120,9 @@ public class InlongStreamImpl extends InlongStream {
 
     }
 
-    public InlongStreamImpl(String name) {
+    public InlongStreamImpl(String group, String name, InnerInlongManagerClient managerClient) {
+        this.managerClient = managerClient;
+        this.groupName = group;
         this.name = name;
     }
 
@@ -234,15 +250,136 @@ public class InlongStreamImpl extends InlongStream {
     }
 
     @Override
-    public void updateSource(StreamSource source) {
-        AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
-        streamSources.put(source.getSourceName(), source);
+    public InlongStream update() {
+        InlongStreamInfo streamInfo = new InlongStreamInfo();
+        streamInfo.setInlongStreamId("b_" + name);
+        streamInfo.setInlongGroupId("b_" + groupName);
+        streamInfo = managerClient.getStreamInfo(streamInfo);
+        if (streamInfo == null) {
+            throw new IllegalArgumentException(
+                    String.format("Stream is not exists for group=%s and stream=%s", groupName, name));
+        }
+        streamInfo.setFieldList(InlongStreamTransfer.createStreamFields(this.streamFields, streamInfo));
+        StreamPipeline streamPipeline = createPipeline();
+        streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
+        Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(streamInfo);
+        if (!updateMsg.getKey()) {
+            throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
+        }
+        initOrUpdateTransform(streamInfo);
+        initOrUpdateSource(streamInfo);
+        initOrUpdateSink(streamInfo);
+        return this;
     }
 
-    @Override
-    public void updateSink(StreamSink sink) {
-        AssertUtil.notNull(sink.getSinkName(), "Sink name should not be empty");
-        streamSinks.put(sink.getSinkName(), sink);
+    private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
+        final String groupId = "b_" + groupName;
+        final String streamId = "b_" + name;
+        List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+        List<String> updateTransformNames = Lists.newArrayList();
+        for (TransformResponse transformResponse : transformResponses) {
+            StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
+            String transformName = transform.getTransformName();
+            if (this.streamTransforms.get(transformName) == null) {
+                TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+                        streamInfo);
+                boolean isDelete = managerClient.deleteTransform(transformRequest);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+                }
+            } else {
+                StreamTransform newTransform = this.streamTransforms.get(transformName);
+                TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(newTransform,
+                        streamInfo);
+                Pair<Boolean, String> updateState = managerClient.updateTransform(transformRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
+                            updateState.getValue()));
+                }
+                updateTransformNames.add(transformName);
+            }
+        }
+        for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
+            String transformName = transformEntry.getKey();
+            if (updateTransformNames.contains(transformName)) {
+                continue;
+            }
+            StreamTransform transform = transformEntry.getValue();
+            TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+                    streamInfo);
+            managerClient.createTransform(transformRequest);
+        }
+    }
+
+    private void initOrUpdateSource(InlongStreamInfo streamInfo) {
+        final String groupId = "b_" + groupName;
+        final String streamId = "b_" + name;
+        List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
+        List<String> updateSourceNames = Lists.newArrayList();
+        for (SourceListResponse sourceListResponse : sourceListResponses) {
+            String sourceName = sourceListResponse.getSourceName();
+            int id = sourceListResponse.getId();
+            String type = sourceListResponse.getSourceType();
+            if (this.streamSources.get(sourceName) == null) {
+                boolean isDelete = managerClient.deleteSource(id, type);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
+                }
+            } else {
+                StreamSource source = this.streamSources.get(sourceName);
+                SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(source, streamInfo);
+                Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
+                            updateState.getValue()));
+                }
+                updateSourceNames.add(sourceName);
+            }
+        }
+        for (Map.Entry<String, StreamSource> requestEntry : streamSources.entrySet()) {
+            String sourceName = requestEntry.getKey();
+            if (updateSourceNames.contains(sourceName)) {
+                continue;
+            }
+            StreamSource streamSource = requestEntry.getValue();
+            SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(streamSource, streamInfo);
+            managerClient.createSource(sourceRequest);
+        }
     }
 
+    private void initOrUpdateSink(InlongStreamInfo streamInfo) {
+        final String groupId = "b_" + groupName;
+        final String streamId = "b_" + name;
+        List<SinkListResponse> sinkListResponses = managerClient.listSinks(groupId, streamId);
+        List<String> updateSinkNames = Lists.newArrayList();
+        for (SinkListResponse sinkListResponse : sinkListResponses) {
+            String sinkName = sinkListResponse.getSinkName();
+            int id = sinkListResponse.getId();
+            String type = sinkListResponse.getSinkType();
+            if (this.streamSinks.get(sinkName) == null) {
+                boolean isDelete = managerClient.deleteSink(id, type);
+                if (!isDelete) {
+                    throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
+                }
+            } else {
+                StreamSink sink = this.streamSinks.get(sinkName);
+                SinkRequest sinkRequest = InlongStreamSinkTransfer.createSinkRequest(sink, streamInfo);
+                Pair<Boolean, String> updateState = managerClient.updateSink(sinkRequest);
+                if (!updateState.getKey()) {
+                    throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
+                            updateState.getValue()));
+                }
+                updateSinkNames.add(sinkName);
+            }
+        }
+        for (Map.Entry<String, StreamSink> requestEntry : streamSinks.entrySet()) {
+            String sinkName = requestEntry.getKey();
+            if (updateSinkNames.contains(sinkName)) {
+                continue;
+            }
+            StreamSink streamSink = requestEntry.getValue();
+            SinkRequest sinkRequest = InlongStreamSinkTransfer.createSinkRequest(streamSink, streamInfo);
+            managerClient.createSink(sinkRequest);
+        }
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index e8a2a6dd1..1e67fa5ad 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -481,6 +481,34 @@ public class InnerInlongManagerClient {
         }
     }
 
+    public boolean deleteSource(int id, String type) {
+        AssertUtil.isTrue(id > 0, "sourceId is illegal");
+        AssertUtil.notEmpty(type, "sourceType should not be null");
+        final String path = HTTP_PATH + "/source/delete/" + id;
+        String url = formatUrl(path);
+        url = String.format("%s&sourceType=%s", url, type);
+        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+        Request request = new Request.Builder()
+                .url(url)
+                .method("DELETE", requestBody)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            return Boolean.parseBoolean(responseBody.getData().toString());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Inlong source delete failed: %s", e.getMessage()), e);
+        }
+    }
+
     public String createTransform(TransformRequest transformRequest) {
         String path = HTTP_PATH + "/transform/save";
         final String sink = GsonUtil.toJson(transformRequest);
@@ -611,6 +639,34 @@ public class InnerInlongManagerClient {
         }
     }
 
+    public boolean deleteSink(int id, String type) {
+        AssertUtil.isTrue(id > 0, "sinkId is illegal");
+        AssertUtil.notEmpty(type, "sinkType should not be null");
+        final String path = HTTP_PATH + "/sink/delete/" + id;
+        String url = formatUrl(path);
+        url = String.format("%s&sinkType=%s", url, type);
+        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+        Request request = new Request.Builder()
+                .url(url)
+                .method("DELETE", requestBody)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            return Boolean.parseBoolean(responseBody.getData().toString());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Inlong sink delete failed: %s", e.getMessage()), e);
+        }
+    }
+
     public List<SinkListResponse> listSinks(String groupId, String streamId) {
         return listSinks(groupId, streamId, null);
     }
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index 19d179588..14e660c23 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.client.api.impl;
 
 import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.sink.KafkaSink;
@@ -28,6 +27,7 @@ import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
 import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
 import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
 import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
 import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
@@ -41,7 +41,7 @@ public class InlongStreamImplTest {
 
     @Test
     public void testCreatePipeline() {
-        InlongStream inlongStream = new InlongStreamImpl();
+        InlongStream inlongStream = new InlongStreamImpl("group", "stream", null);
         // add stream source
         KafkaSource kafkaSource = new KafkaSource();
         kafkaSource.setSourceName("A");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index 6538dd5c6..f0a19492d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -45,7 +45,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -58,7 +58,7 @@ public class FieldInfoUtils {
     /**
      * Built in field map, key is field name, value is built in field name
      */
-    public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
+    public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new LinkedHashMap<>();
 
     static {
         BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
@@ -68,20 +68,28 @@ public class FieldInfoUtils {
         BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.PROCESSING_TIME.getName(), BuiltInField.PROCESS_TIME);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.UPDATE_BEFORE.getName(), BuiltInField.METADATA_UPDATE_BEFORE);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.BATCH_ID.getName(), BuiltInField.METADATA_BATCH_ID);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.SQL_TYPE.getName(), BuiltInField.METADATA_SQL_TYPE);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.TS.getName(), BuiltInField.METADATA_TS);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_TYPE.getName(), BuiltInField.METADATA_MYSQL_TYPE);
         BUILT_IN_FIELD_MAP.put(MetaFieldType.PK_NAMES.getName(), BuiltInField.METADATA_PK_NAMES);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
+    }
+
+    public static FieldInfo parseSinkFieldInfo(SinkFieldResponse sinkFieldResponse, String nodeId) {
+        boolean isBuiltIn = sinkFieldResponse.getIsMetaField() == 1;
+        FieldInfo fieldInfo = getFieldInfo(sinkFieldResponse.getFieldName(), sinkFieldResponse.getFieldType(),
+                isBuiltIn, sinkFieldResponse.getFieldFormat());
+        fieldInfo.setNodeId(nodeId);
+        return fieldInfo;
     }
 
-    public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo streamField, String name) {
+    public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo streamField, String nodeId) {
         boolean isBuiltIn = streamField.getIsMetaField() == 1;
         FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
                 streamField.getFieldFormat());
-        fieldInfo.setNodeId(name);
+        fieldInfo.setNodeId(nodeId);
         return fieldInfo;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 1ebd697ad..c4cae249c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -73,9 +73,8 @@ public class LoadNodeUtils {
         String bootstrapServers = kafkaSinkResponse.getBootstrapServers();
         List<SinkFieldResponse> sinkFieldResponses = kafkaSinkResponse.getFieldList();
         List<FieldInfo> fieldInfos = sinkFieldResponses.stream()
-                .map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
-                        FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
-                                sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+                .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+                .collect(Collectors.toList());
         List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
         Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
@@ -127,9 +126,8 @@ public class LoadNodeUtils {
         String hiveVersion = hiveSinkResponse.getHiveVersion();
         List<SinkFieldResponse> sinkFieldResponses = hiveSinkResponse.getFieldList();
         List<FieldInfo> fields = sinkFieldResponses.stream()
-                .map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
-                        FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
-                                sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+                .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+                .collect(Collectors.toList());
         List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
         Map<String, String> properties = hiveSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));