You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/24 02:14:45 UTC

[incubator-inlong] branch master updated: [INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c1225c5  [INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)
c1225c5 is described below

commit c1225c53bd3bdaff0e845c1d4e25d10c57a141c0
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 24 10:14:39 2022 +0800

    [INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)
---
 .../api/impl/DefaultInlongStreamBuilder.java       |  14 +-
 .../manager/client/api/impl/InlongClientImpl.java  |  12 +-
 .../manager/client/api/impl/InlongStreamImpl.java  |   4 +-
 .../client/api/inner/InnerInlongManagerClient.java |  32 ++-
 .../client/api/inner/InnerStreamContext.java       |   6 +-
 .../manager/client/api/util/InlongParser.java      |   6 +-
 .../client/api/util/InlongStreamSinkTransfer.java  |  10 +-
 .../api/util/InlongStreamSourceTransfer.java       |  12 +-
 .../client/api/util/InlongStreamTransfer.java      |   8 +-
 .../inlong/manager/common/enums/SourceType.java    |   1 +
 .../common/pojo/stream/FullPageUpdateRequest.java  |  43 ----
 .../common/pojo/stream/FullStreamRequest.java      |  24 +--
 .../common/pojo/stream/FullStreamResponse.java     |  18 +-
 ...ongStreamInfo.java => InlongStreamRequest.java} |  46 +---
 ...ngStreamInfo.java => InlongStreamResponse.java} |  16 +-
 .../workflow/form/GroupResourceProcessForm.java    |   4 +-
 .../pojo/workflow/form/UpdateGroupProcessForm.java |   4 +-
 .../manager/service/CommonOperateService.java      |   4 +-
 .../manager/service/core/InlongStreamService.java  |  33 +--
 .../core/impl/InlongGroupProcessOperation.java     |   8 +-
 .../service/core/impl/InlongStreamServiceImpl.java | 236 ++++++++-------------
 .../thirdparty/sort/PushSortConfigListener.java    |   4 +-
 .../thirdparty/sort/util/SerializationUtils.java   |   9 +-
 .../thirdparty/sort/util/SourceInfoUtils.java      |   6 +-
 .../listener/StartCreateGroupProcessListener.java  |   8 +-
 .../service/core/impl/InlongStreamServiceTest.java |  21 +-
 .../source/listener/DataSourceListenerTest.java    |   8 +-
 .../thirdparty/sort/DisableZkForSortTest.java      |  10 +-
 .../service/workflow/WorkflowServiceImplTest.java  |  31 +--
 .../web/controller/InlongStreamController.java     |  18 +-
 inlong-manager/pom.xml                             |   5 +
 31 files changed, 254 insertions(+), 407 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 9abf6c9..a391f5b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -42,7 +42,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.List;
 
@@ -68,11 +68,11 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
             groupContext.setStreamContextMap(Maps.newHashMap());
         }
-        InlongStreamInfo streamInfo = InlongStreamTransfer.createStreamInfo(streamConf, groupContext.getGroupInfo());
-        InnerStreamContext streamContext = new InnerStreamContext(streamInfo);
+        InlongStreamResponse stream = InlongStreamTransfer.createStreamInfo(streamConf, groupContext.getGroupInfo());
+        InnerStreamContext streamContext = new InnerStreamContext(stream);
         groupContext.setStreamContext(streamContext);
         this.streamContext = streamContext;
-        this.inlongStream = new InlongStreamImpl(streamInfo.getName());
+        this.inlongStream = new InlongStreamImpl(stream.getName());
         if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
             this.inlongStream.setStreamFields(streamConf.getStreamFields());
         }
@@ -107,7 +107,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     @Override
     public InlongStream init() {
-        InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+        InlongStreamResponse streamInfo = streamContext.getStreamInfo();
         String streamIndex = managerClient.createStreamInfo(streamInfo);
         streamInfo.setId(Double.valueOf(streamIndex).intValue());
         //Create source and update index
@@ -127,8 +127,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     @Override
     public InlongStream initOrUpdate() {
-        InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
-        Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
+        InlongStreamResponse dataStreamInfo = streamContext.getStreamInfo();
+        Pair<Boolean, InlongStreamResponse> existMsg = managerClient.isStreamExists(dataStreamInfo);
         if (existMsg.getKey()) {
             Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
             if (updateMsg.getKey()) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 9be4a19..ac44032 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -49,7 +49,7 @@ public class InlongClientImpl implements InlongClient {
     private static final String URL_SPLITTER = ",";
     private static final String HOST_SPLITTER = ":";
     @Getter
-    private ClientConfiguration configuration;
+    private final ClientConfiguration configuration;
 
     public InlongClientImpl(String serviceUrl, ClientConfiguration configuration) {
         Map<String, String> hostPorts = Splitter.on(URL_SPLITTER).withKeyValueSeparator(HOST_SPLITTER)
@@ -76,13 +76,12 @@ public class InlongClientImpl implements InlongClient {
     }
 
     @Override
-    public InlongGroup forGroup(InlongGroupConf groupConf) throws Exception {
+    public InlongGroup forGroup(InlongGroupConf groupConf) {
         return new InlongGroupImpl(groupConf, this);
     }
 
     @Override
-    public List<InlongGroup> listGroup(String expr, int status,
-            int pageNum, int pageSize) throws Exception {
+    public List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) {
         InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
         PageInfo<InlongGroupListResponse> responsePageInfo = managerClient.listGroups(expr, status, pageNum,
                 pageSize);
@@ -103,7 +102,6 @@ public class InlongClientImpl implements InlongClient {
      *
      * @param request The request
      * @return PageInfo of group
-     * @throws Exception
      */
     @Override
     public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
@@ -112,7 +110,7 @@ public class InlongClientImpl implements InlongClient {
     }
 
     @Override
-    public InlongGroup getGroup(String groupName) throws Exception {
+    public InlongGroup getGroup(String groupName) {
         InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
         final String groupId = "b_" + groupName;
         InlongGroupResponse groupResponse = managerClient.getGroupInfo(groupId);
@@ -136,7 +134,7 @@ public class InlongClientImpl implements InlongClient {
             try {
                 socket.close();
             } catch (IOException e) {
-                log.warn("colse connection from {}:{} failed", host, port, e);
+                log.warn("close connection from {}:{} failed", host, port, e);
             }
         }
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index a102181..e1b6180 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -35,7 +35,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.List;
 import java.util.Map;
@@ -55,7 +55,7 @@ public class InlongStreamImpl extends InlongStream {
     private List<StreamField> streamFields = Lists.newArrayList();
 
     public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
-        InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
+        InlongStreamResponse streamInfo = fullStreamResponse.getStreamInfo();
         this.name = streamInfo.getName();
         List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 974fbac..3d298a7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -50,8 +50,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -166,7 +166,7 @@ public class InnerInlongManagerClient {
             pageNum = 1;
         }
         JSONObject groupQuery = new JSONObject();
-        groupQuery.put("keyWord", pageNum);
+        groupQuery.put("keyWord", keyword);
         groupQuery.put("status", status);
         groupQuery.put("pageNum", pageNum);
         groupQuery.put("pageSize", pageSize);
@@ -205,10 +205,8 @@ public class InnerInlongManagerClient {
      *
      * @param inlongGroupRequest The inlongGroupRequest
      * @return Response encapsulate of inlong group list
-     * @throws Exception may throws exception
      */
-    public Response<PageInfo<InlongGroupListResponse>> listGroups(
-            InlongGroupPageRequest inlongGroupRequest)
+    public Response<PageInfo<InlongGroupListResponse>> listGroups(InlongGroupPageRequest inlongGroupRequest)
             throws Exception {
         String requestParams = GsonUtil.toJson(inlongGroupRequest);
         RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestParams);
@@ -284,7 +282,7 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public String createStreamInfo(InlongStreamInfo streamInfo) {
+    public String createStreamInfo(InlongStreamResponse streamInfo) {
         String path = HTTP_PATH + "/stream/save";
         final String stream = GsonUtil.toJson(streamInfo);
         final RequestBody streamBody = RequestBody.create(MediaType.parse("application/json"), stream);
@@ -309,8 +307,8 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public Pair<Boolean, InlongStreamInfo> isStreamExists(InlongStreamInfo streamInfo) {
-        InlongStreamInfo currentStreamInfo = getStreamInfo(streamInfo);
+    public Pair<Boolean, InlongStreamResponse> isStreamExists(InlongStreamResponse streamInfo) {
+        InlongStreamResponse currentStreamInfo = getStreamInfo(streamInfo);
         if (currentStreamInfo != null) {
             return Pair.of(true, currentStreamInfo);
         } else {
@@ -318,7 +316,7 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) {
+    public Pair<Boolean, String> updateStreamInfo(InlongStreamResponse streamInfo) {
         streamInfo.setCreateTime(null);
         streamInfo.setModifyTime(null);
         final String path = HTTP_PATH + "/stream/update";
@@ -346,7 +344,7 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public InlongStreamInfo getStreamInfo(InlongStreamInfo streamInfo) {
+    public InlongStreamResponse getStreamInfo(InlongStreamResponse streamInfo) {
         String path = HTTP_PATH + "/stream/get";
         String url = formatUrl(path);
         url += String.format("&groupId=%s&streamId=%s", streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId());
@@ -375,15 +373,15 @@ public class InnerInlongManagerClient {
     }
 
     public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
+        InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
+        pageRequest.setInlongGroupId(inlongGroupId);
+        String requestParams = GsonUtil.toJson(pageRequest);
+        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestParams);
         final String path = HTTP_PATH + "/stream/listAll";
-        String url = formatUrl(path);
-        InlongStreamPageRequest inlongStreamPageRequest = new InlongStreamPageRequest();
-        inlongStreamPageRequest.setInlongGroupId(inlongGroupId);
-        final String source = GsonUtil.toJson(inlongStreamPageRequest);
-        final RequestBody sourceBody = RequestBody.create(MediaType.parse("application/json"), source);
-        Request request = new Request.Builder()
+        final String url = formatUrl(path);
+        Request request = new Request.Builder().get()
                 .url(url)
-                .post(sourceBody)
+                .post(requestBody)
                 .build();
 
         Call call = httpClient.newCall(request);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 5b85758..1dc3d32 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -23,7 +23,7 @@ import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.List;
 import java.util.Map;
@@ -32,13 +32,13 @@ import java.util.Map;
 @NoArgsConstructor
 public class InnerStreamContext {
 
-    private InlongStreamInfo streamInfo;
+    private InlongStreamResponse streamInfo;
 
     private Map<String, SourceRequest> sourceRequests = Maps.newHashMap();
 
     private Map<String, SinkRequest> sinkRequests = Maps.newHashMap();
 
-    public InnerStreamContext(InlongStreamInfo streamInfo) {
+    public InnerStreamContext(InlongStreamResponse streamInfo) {
         this.streamInfo = streamInfo;
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4a61ca7..c608960 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -46,7 +46,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
@@ -103,9 +103,9 @@ public class InlongParser {
                 }.getType());
     }
 
-    public static InlongStreamInfo parseStreamInfo(Response response) {
+    public static InlongStreamResponse parseStreamInfo(Response response) {
         Object data = response.getData();
-        return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
+        return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamResponse.class);
     }
 
     public static List<FullStreamResponse> parseStreamList(Response response) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 28a138c..dccbdfb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
 import java.nio.charset.Charset;
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
 
 public class InlongStreamSinkTransfer {
 
-    public static SinkRequest createSinkRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+    public static SinkRequest createSinkRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
         SinkType sinkType = streamSink.getSinkType();
         SinkRequest sinkRequest;
         if (sinkType == SinkType.HIVE) {
@@ -84,7 +84,7 @@ public class InlongStreamSinkTransfer {
         return streamSinkResult;
     }
 
-    private static SinkRequest createClickHouseRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+    private static SinkRequest createClickHouseRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
         ClickHouseSinkRequest clickHouseSinkRequest = new ClickHouseSinkRequest();
         ClickHouseSink clickHouseSink = (ClickHouseSink) streamSink;
         clickHouseSinkRequest.setSinkName(clickHouseSink.getSinkName());
@@ -157,7 +157,7 @@ public class InlongStreamSinkTransfer {
 
     }
 
-    private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+    private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
         KafkaSinkRequest kafkaSinkRequest = new KafkaSinkRequest();
         KafkaSink kafkaSink = (KafkaSink) streamSink;
         kafkaSinkRequest.setSinkName(streamSink.getSinkName());
@@ -200,7 +200,7 @@ public class InlongStreamSinkTransfer {
         return kafkaSink;
     }
 
-    private static HiveSinkRequest createHiveRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+    private static HiveSinkRequest createHiveRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
         HiveSinkRequest hiveSinkRequest = new HiveSinkRequest();
         HiveSink hiveSink = (HiveSink) streamSink;
         hiveSinkRequest.setSinkName(streamSink.getSinkName());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 92a8e7a..f90d995 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -38,7 +38,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.Arrays;
 
@@ -47,7 +47,7 @@ import java.util.Arrays;
  */
 public class InlongStreamSourceTransfer {
 
-    public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
+    public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamResponse streamInfo) {
         SourceType sourceType = streamSource.getSourceType();
         switch (sourceType) {
             case KAFKA:
@@ -176,11 +176,11 @@ public class InlongStreamSourceTransfer {
         return binlogSource;
     }
 
-    private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
+    private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamResponse stream) {
         KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
         sourceRequest.setSourceName(kafkaSource.getSourceName());
-        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
-        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setInlongGroupId(stream.getInlongGroupId());
+        sourceRequest.setInlongStreamId(stream.getInlongStreamId());
         sourceRequest.setSourceType(kafkaSource.getSourceType().name());
         sourceRequest.setAgentIp(kafkaSource.getAgentIp());
         sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
@@ -199,7 +199,7 @@ public class InlongStreamSourceTransfer {
     }
 
     private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
-            InlongStreamInfo streamInfo) {
+            InlongStreamResponse streamInfo) {
         BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
         sourceRequest.setSourceName(binlogSource.getSourceName());
         sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 522affc..0ca083b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -23,15 +23,15 @@ import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class InlongStreamTransfer {
 
-    public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
-        InlongStreamInfo dataStreamInfo = new InlongStreamInfo();
+    public static InlongStreamResponse createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
+        InlongStreamResponse dataStreamInfo = new InlongStreamResponse();
         dataStreamInfo.setInlongGroupId(groupInfo.getInlongGroupId());
         final String streamId = "b_" + streamConf.getName();
         dataStreamInfo.setInlongStreamId(streamId);
@@ -58,7 +58,7 @@ public class InlongStreamTransfer {
 
     public static List<InlongStreamFieldInfo> createStreamFields(
             List<StreamField> fieldList,
-            InlongStreamInfo streamInfo) {
+            InlongStreamResponse streamInfo) {
         List<InlongStreamFieldInfo> fieldInfos = fieldList.stream().map(streamField -> {
             InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
             fieldInfo.setInlongStreamId(streamInfo.getInlongStreamId());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index e7f5f2f..19c17af 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -27,6 +27,7 @@ import java.util.Locale;
  */
 public enum SourceType {
 
+    AUTO_PUSH("AUTO_PUSH", null),
     FILE("FILE", TaskTypeEnum.FILE),
     SQL("SQL", TaskTypeEnum.SQL),
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java
deleted file mode 100644
index a69112a..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-
-/**
- * Inlong stream page update request,
- * <p/>including the basic info of the inlong stream and source
- */
-@Data
-@ApiModel("Inlong stream page update request")
-public class FullPageUpdateRequest {
-
-    @ApiModelProperty("Inlong stream info")
-    private InlongStreamInfo streamInfo;
-
-    @ApiModelProperty("Basic file info")
-    private SourceFileBasicInfo fileBasicInfo;
-
-    @ApiModelProperty("Basic db info")
-    private SourceDbBasicInfo dbBasicInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
index 6ddc0c7..5468f6d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
@@ -21,10 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 
 import java.util.List;
 
@@ -35,22 +32,13 @@ import java.util.List;
 @ApiModel("All request info on the inlong stream page")
 public class FullStreamRequest {
 
-    @ApiModelProperty("Inlong stream information")
-    private InlongStreamInfo streamInfo;
+    @ApiModelProperty("Inlong stream info")
+    private InlongStreamRequest streamInfo;
 
-    @ApiModelProperty("Basic information of file source")
-    private SourceFileBasicInfo fileBasicInfo;
+    @ApiModelProperty("Source info list")
+    private List<SourceRequest> sourceInfo;
 
-    @ApiModelProperty("File source details")
-    private List<SourceFileDetailInfo> fileDetailInfoList;
-
-    @ApiModelProperty("DB source basic information")
-    private SourceDbBasicInfo dbBasicInfo;
-
-    @ApiModelProperty("DB source details")
-    private List<SourceDbDetailInfo> dbDetailInfoList;
-
-    @ApiModelProperty("Sink information")
+    @ApiModelProperty("Sink info list")
     private List<SinkRequest> sinkInfo;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
index 7343ca5..8fca538 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
@@ -21,10 +21,6 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 
 import java.util.List;
@@ -37,19 +33,7 @@ import java.util.List;
 public class FullStreamResponse {
 
     @ApiModelProperty("Inlong stream information")
-    private InlongStreamInfo streamInfo;
-
-    @ApiModelProperty("Basic information of file source")
-    private SourceFileBasicInfo fileBasicInfo;
-
-    @ApiModelProperty("File source details")
-    private List<SourceFileDetailInfo> fileDetailInfoList;
-
-    @ApiModelProperty("DB source basic information")
-    private SourceDbBasicInfo dbBasicInfo;
-
-    @ApiModelProperty("DB source details")
-    private List<SourceDbDetailInfo> dbDetailInfoList;
+    private InlongStreamResponse streamInfo;
 
     @ApiModelProperty("Stream source information")
     private List<SourceResponse> sourceInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
similarity index 69%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index b82cb37..b23e42d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -17,22 +17,20 @@
 
 package org.apache.inlong.manager.common.pojo.stream;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
-import java.util.Date;
 import java.util.List;
 
 /**
- * Inlong stream info
+ * Inlong stream request.
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream info")
-public class InlongStreamInfo extends InlongStreamBaseInfo {
+@ApiModel("Inlong stream page update request")
+public class InlongStreamRequest extends InlongStreamBaseInfo {
 
     @ApiModelProperty(value = "Primary key")
     private Integer id;
@@ -47,19 +45,16 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
             notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
     private String mqResourceObj;
 
-    @ApiModelProperty(value = "Source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
-    private String dataSourceType;
-
     @ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
     private Integer storagePeriod;
 
     @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
     private String dataType;
 
-    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
     private String dataEncoding;
 
-    @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
     private String dataSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
@@ -68,7 +63,9 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
     private Integer havePredefinedFields;
 
-    @ApiModelProperty(value = "order_preserving 0: none, 1: yes")
+    @ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes",
+            notes = "Each task under this stream sends data synchronously, "
+                    + "which will affect the throughput of data collection, please choose carefully")
     private Integer syncSend;
 
     @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -86,32 +83,7 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "Names of responsible persons, separated by commas")
     private String inCharges;
 
-    @ApiModelProperty(value = "Status")
-    private Integer status;
-
-    @ApiModelProperty(value = "Previous status")
-    private Integer previousStatus;
-
-    @ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
-    private Integer isDeleted = 0;
-
-    private String creator;
-
-    private String modifier;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date createTime;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date modifyTime;
-
-    @ApiModelProperty(value = "Temporary view, string in JSON format")
-    private String tempView;
-
-    @ApiModelProperty(value = "Extended information list")
-    private List<InlongStreamExtInfo> extList;
-
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
similarity index 86%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index b82cb37..8eb80db 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -32,7 +32,7 @@ import java.util.List;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @ApiModel("Inlong stream info")
-public class InlongStreamInfo extends InlongStreamBaseInfo {
+public class InlongStreamResponse extends InlongStreamBaseInfo {
 
     @ApiModelProperty(value = "Primary key")
     private Integer id;
@@ -47,19 +47,16 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
             notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
     private String mqResourceObj;
 
-    @ApiModelProperty(value = "Source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
-    private String dataSourceType;
-
     @ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
     private Integer storagePeriod;
 
     @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
     private String dataType;
 
-    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
     private String dataEncoding;
 
-    @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
     private String dataSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
@@ -68,7 +65,9 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
     private Integer havePredefinedFields;
 
-    @ApiModelProperty(value = "order_preserving 0: none, 1: yes")
+    @ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes",
+            notes = "Each task under this stream sends data synchronously, "
+                    + "which will affect the throughput of data collection, please choose carefully")
     private Integer syncSend;
 
     @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -108,9 +107,6 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "Temporary view, string in JSON format")
     private String tempView;
 
-    @ApiModelProperty(value = "Extended information list")
-    private List<InlongStreamExtInfo> extList;
-
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
index ef7dd15..b2c8f99 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
@@ -22,7 +22,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 
 import java.util.List;
 import java.util.Map;
@@ -40,7 +40,7 @@ public class GroupResourceProcessForm extends BaseProcessForm {
 
     private String streamId;
 
-    private List<InlongStreamInfo> inlongStreamInfoList;
+    private List<InlongStreamResponse> streamList;
 
     public InlongGroupInfo getGroupInfo() {
         return groupInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
index 2571a52..a367a7f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
@@ -25,7 +25,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.List;
@@ -44,7 +44,7 @@ public class UpdateGroupProcessForm extends BaseProcessForm {
     @ApiModelProperty(value = "OperateType to define the update operation", required = true)
     private OperateType operateType;
 
-    private List<InlongStreamInfo> inlongStreamInfoList;
+    private List<InlongStreamResponse> streamList;
 
     @Override
     public void validate() throws FormValidateException {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index baec6e7..bc23767 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -227,7 +227,7 @@ public class CommonOperateService {
         // Get source info
         String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
         PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
-        InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+        InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
         SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
                 groupInfo, streamInfo, sourceResponse, sourceFields);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
index ac0f294..5388a7d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
@@ -18,14 +18,14 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 
 import java.util.List;
@@ -33,18 +33,19 @@ import java.util.List;
 /**
  * Inlong stream service layer interface
  *
- * @apiNote It is associated with various DataSources, the upstream is StreamSource, and the downstream is StreamSink
+ * @apiNote It is associated with various DataSources, the upstream is StreamSource, and the downstream is
+ *         StreamSink
  */
 public interface InlongStreamService {
 
     /**
-     * Save inlong stream information
+     * Save inlong stream information.
      *
-     * @param streamInfo Basic inlong stream information
-     * @param operator Edit person's name
-     * @return Inlong stream id after successful save
+     * @param request Inlong stream information.
+     * @param operator The name of operator.
+     * @return Id after successful save.
      */
-    Integer save(InlongStreamInfo streamInfo, String operator);
+    Integer save(InlongStreamRequest request, String operator);
 
     /**
      * Query the details of the specified inlong stream
@@ -53,7 +54,7 @@ public interface InlongStreamService {
      * @param streamId Inlong stream id
      * @return inlong stream details
      */
-    InlongStreamInfo get(String groupId, String streamId);
+    InlongStreamResponse get(String groupId, String streamId);
 
     /**
      * Query whether the inlong stream ID exists
@@ -75,11 +76,11 @@ public interface InlongStreamService {
     /**
      * InlongStream info that needs to be modified
      *
-     * @param streamInfo inlong stream information that needs to be modified
+     * @param request inlong stream info that needs to be modified
      * @param operator Edit person's name
      * @return whether succeed
      */
-    boolean update(InlongStreamInfo streamInfo, String operator);
+    Boolean update(InlongStreamRequest request, String operator);
 
     /**
      * Delete the specified inlong stream
@@ -89,7 +90,7 @@ public interface InlongStreamService {
      * @param operator Edit person's name
      * @return whether succeed
      */
-    boolean delete(String groupId, String streamId, String operator);
+    Boolean delete(String groupId, String streamId, String operator);
 
     /**
      * Logically delete all inlong streams under the specified groupId
@@ -98,7 +99,7 @@ public interface InlongStreamService {
      * @param operator Edit person's name
      * @return whether succeed
      */
-    boolean logicDeleteAll(String groupId, String operator);
+    Boolean logicDeleteAll(String groupId, String operator);
 
     /**
      * Obtain the flow of inlong stream according to groupId
@@ -145,7 +146,7 @@ public interface InlongStreamService {
      * @apiNote The data source details and data sink information are modified separately,
      *         not in this all modification interface
      */
-    boolean updateAll(FullPageUpdateRequest updateInfo, String operator);
+    boolean updateAll(InlongStreamRequest updateInfo, String operator);
 
     /**
      * According to the group id, query the number of valid inlong streams belonging to this service
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 7ce1040..70dab08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.core.impl;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
@@ -193,9 +193,9 @@ public class InlongGroupProcessOperation {
         if (OperateType.RESTART == operateType) {
             List<InlongStreamEntity> inlongStreamEntityList =
                     streamMapper.selectByGroupId(groupInfo.getInlongGroupId());
-            List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
-                    InlongStreamInfo::new);
-            form.setInlongStreamInfoList(inlongStreamInfoList);
+            List<InlongStreamResponse> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+                    InlongStreamResponse::new);
+            form.setStreamList(streamList);
         }
         form.setGroupInfo(groupInfo);
         form.setOperateType(operateType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 8315922..68f24c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -31,16 +31,17 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -66,6 +67,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -97,11 +99,11 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public Integer save(InlongStreamInfo streamInfo, String operator) {
-        LOGGER.debug("begin to save inlong stream info={}", streamInfo);
-        Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
-        String groupId = streamInfo.getInlongGroupId();
-        String streamId = streamInfo.getInlongStreamId();
+    public Integer save(InlongStreamRequest request, String operator) {
+        LOGGER.debug("begin to save inlong stream info={}", request);
+        Preconditions.checkNotNull(request, "inlong stream info is empty");
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
@@ -114,29 +116,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             LOGGER.error("inlong stream id [{}] has already exists", streamId);
             throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
         }
-        if (StringUtils.isEmpty(streamInfo.getMqResourceObj())) {
-            streamInfo.setMqResourceObj(streamId);
+        if (StringUtils.isEmpty(request.getMqResourceObj())) {
+            request.setMqResourceObj(streamId);
         }
         // Processing inlong stream
-        InlongStreamEntity streamEntity = CommonBeanUtils.copyProperties(streamInfo, InlongStreamEntity::new);
-        Date date = new Date();
+        InlongStreamEntity streamEntity = CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
         streamEntity.setStatus(EntityStatus.STREAM_NEW.getCode());
-        streamEntity.setModifier(operator);
-        streamEntity.setCreateTime(date);
+        streamEntity.setCreator(operator);
+        streamEntity.setCreateTime(new Date());
 
         streamMapper.insertSelective(streamEntity);
-
-        // Processing extended information
-        this.saveExt(groupId, streamId, streamInfo.getExtList(), date);
-        // WorkflowProcess data source fields
-        this.saveField(groupId, streamId, streamInfo.getFieldList());
+        this.saveField(groupId, streamId, request.getFieldList());
 
         LOGGER.info("success to save inlong stream info for groupId={}", groupId);
         return streamEntity.getId();
     }
 
     @Override
-    public InlongStreamInfo get(String groupId, String streamId) {
+    public InlongStreamResponse get(String groupId, String streamId) {
         LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
@@ -147,11 +144,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
 
-        InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
-        this.setStreamExtAndField(groupId, streamId, streamInfo);
+        InlongStreamResponse response = CommonBeanUtils.copyProperties(streamEntity, InlongStreamResponse::new);
+        List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, streamId);
+        response.setFieldList(streamFields);
 
         LOGGER.info("success to get inlong stream for groupId={}", groupId);
-        return streamInfo;
+        return response;
     }
 
     @Override
@@ -163,20 +161,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     /**
      * Query and set the extended information and data source fields of the inlong stream
-     *
-     * @param groupId Inlong group id
-     * @param streamId Inlong stream id
-     * @param streamInfo Inlong stream that needs to be filled
      */
-    private void setStreamExtAndField(String groupId, String streamId, InlongStreamInfo streamInfo) {
-        List<InlongStreamExtEntity> extEntityList = streamExtMapper.selectByIdentifier(groupId, streamId);
-        if (CollectionUtils.isNotEmpty(extEntityList)) {
-            streamInfo.setExtList(CommonBeanUtils.copyListProperties(extEntityList, InlongStreamExtInfo::new));
-        }
+    private List<InlongStreamFieldInfo> getStreamFields(String groupId, String streamId) {
         List<InlongStreamFieldEntity> fieldEntityList = streamFieldMapper.selectByIdentifier(groupId, streamId);
-        if (CollectionUtils.isNotEmpty(fieldEntityList)) {
-            streamInfo.setFieldList(CommonBeanUtils.copyListProperties(fieldEntityList, InlongStreamFieldInfo::new));
+        if (CollectionUtils.isEmpty(fieldEntityList)) {
+            return Collections.emptyList();
         }
+        return CommonBeanUtils.copyListProperties(fieldEntityList, InlongStreamFieldInfo::new);
     }
 
     @Override
@@ -217,34 +208,34 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean update(InlongStreamInfo streamInfo, String operator) {
-        LOGGER.debug("begin to update inlong stream info={}", streamInfo);
-        Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
-        String groupId = streamInfo.getInlongGroupId();
+    public Boolean update(InlongStreamRequest request, String operator) {
+        LOGGER.debug("begin to update inlong stream info={}", request);
+        Preconditions.checkNotNull(request, "inlong stream request is empty");
+        String groupId = request.getInlongGroupId();
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-        String streamId = streamInfo.getInlongStreamId();
+        String streamId = request.getInlongStreamId();
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
         // Check if it can be modified
         InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
 
-        // Add if it doesn't exist, modify if it exists
+        // Make sure the stream was exists
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         if (streamEntity == null) {
-            this.save(streamInfo, operator);
-        } else {
-            // Check whether the current inlong group status supports modification
-            this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, streamInfo);
+            LOGGER.error("inlong stream not found by groupId={}, streamId={}", groupId, streamId);
+            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
+        }
 
-            CommonBeanUtils.copyProperties(streamInfo, streamEntity, true);
-            streamEntity.setModifier(operator);
-            streamEntity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
-            streamMapper.updateByIdentifierSelective(streamEntity);
+        // Check whether the current inlong group status supports modification
+        this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, request);
 
-            // Update extended information, field information
-            this.updateExt(groupId, streamId, streamInfo.getExtList());
-            this.updateField(groupId, streamId, streamInfo.getFieldList());
-        }
+        CommonBeanUtils.copyProperties(request, streamEntity, true);
+        streamEntity.setModifier(operator);
+        streamEntity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
+        streamMapper.updateByIdentifierSelective(streamEntity);
+
+        // Update field information
+        this.updateField(groupId, streamId, request.getFieldList());
 
         LOGGER.info("success to update inlong stream for groupId={}", groupId);
         return true;
@@ -252,13 +243,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean delete(String groupId, String streamId, String operator) {
+    public Boolean delete(String groupId, String streamId, String operator) {
         LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
         // Check if it can be deleted
-        InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
+        this.checkBizIsTempStatus(groupId);
 
         InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, streamId);
         if (entity == null) {
@@ -266,28 +257,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
 
-        // If there is an undeleted data source, the deletion fails
-        boolean dataSourceExist = hasDataSource(groupId, streamId, entity.getDataSourceType());
-        if (dataSourceExist) {
-            LOGGER.error("inlong stream has undeleted data sources, delete failed");
+        // If there is undeleted stream source, the deletion fails
+        Integer sourceCount = sourceService.getCount(groupId, streamId);
+        if (sourceCount > 0) {
+            LOGGER.error("inlong stream has undeleted sources, delete failed");
             throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
         }
 
-        // If there is undeleted data sink information, the deletion fails
+        // If there is undeleted stream sink, the deletion fails
         int sinkCount = sinkService.getCount(groupId, streamId);
         if (sinkCount > 0) {
             LOGGER.error("inlong stream has undeleted sinks, delete failed");
             throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
         }
 
-        entity.setIsDeleted(1);
+        entity.setIsDeleted(entity.getId());
         entity.setModifier(operator);
         streamMapper.updateByPrimaryKey(entity);
 
-        // To logically delete the associated extension table
-        LOGGER.debug("begin to delete inlong stream ext property, groupId={}, streamId={}", groupId, streamId);
-        streamExtMapper.logicDeleteAllByIdentifier(groupId, streamId);
-
         // Logically delete the associated field table
         LOGGER.debug("begin to delete inlong stream field, streamId={}", streamId);
         streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
@@ -298,7 +285,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean logicDeleteAll(String groupId, String operator) {
+    public Boolean logicDeleteAll(String groupId, String operator) {
         LOGGER.debug("begin to delete all inlong stream by groupId={}", groupId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
@@ -317,14 +304,9 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             streamMapper.updateByIdentifierSelective(entity);
 
             String streamId = entity.getInlongStreamId();
-            // To logically delete the associated extension table
-            streamExtMapper.logicDeleteAllByIdentifier(groupId, streamId);
-            // Logically delete the associated field table
+            // Logically delete the associated field, source and sink info
             streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
-            // Tombstone the associated data source
-            sourceFileService.logicDeleteAllByIdentifier(groupId, streamId, operator);
-            sourceDbService.logicDeleteAllByIdentifier(groupId, streamId, operator);
-            // Logical deletion of associated data sink information
+            sourceService.logicDeleteAll(groupId, streamId, operator);
             sinkService.logicDeleteAll(groupId, streamId, operator);
         }
 
@@ -376,36 +358,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
         }
         Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
-        InlongStreamInfo streamInfo = fullStreamRequest.getStreamInfo();
-        Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
+        InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
+        Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
 
         // Check whether it can be added: check by lower-level specific services
         // this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
 
         // 1. Save inlong stream
-        this.save(streamInfo, operator);
+        this.save(streamRequest, operator);
 
-        // 2.1 Save file data source information
-        if (fullStreamRequest.getFileBasicInfo() != null) {
-            sourceFileService.saveBasic(fullStreamRequest.getFileBasicInfo(), operator);
-        }
-        if (CollectionUtils.isNotEmpty(fullStreamRequest.getFileDetailInfoList())) {
-            for (SourceFileDetailInfo detailInfo : fullStreamRequest.getFileDetailInfoList()) {
-                sourceFileService.saveDetail(detailInfo, operator);
-            }
-        }
-
-        // 2.2 Save DB data source information
-        if (fullStreamRequest.getDbBasicInfo() != null) {
-            sourceDbService.saveBasic(fullStreamRequest.getDbBasicInfo(), operator);
-        }
-        if (CollectionUtils.isNotEmpty(fullStreamRequest.getDbDetailInfoList())) {
-            for (SourceDbDetailInfo detailInfo : fullStreamRequest.getDbDetailInfoList()) {
-                sourceDbService.saveDetail(detailInfo, operator);
+        // 2 Save source info
+        if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
+            for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
+                sourceService.save(source, operator);
             }
         }
 
-        // 3. Save data sink information
+        // 3. Save sink info
         if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
             for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
                 sinkService.save(sinkInfo, operator);
@@ -425,7 +394,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
 
         // Check if it can be added
-        InlongStreamInfo firstStream = fullStreamRequestList.get(0).getStreamInfo();
+        InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
         Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
         String groupId = firstStream.getInlongGroupId();
         this.checkBizIsTempStatus(groupId);
@@ -438,17 +407,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
         for (FullStreamRequest pageInfo : fullStreamRequestList) {
             // 1.1 Delete the inlong stream extensions and fields corresponding to groupId and streamId
-            InlongStreamInfo streamInfo = pageInfo.getStreamInfo();
+            InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
             String streamId = streamInfo.getInlongStreamId();
-
-            streamExtMapper.deleteAllByIdentifier(groupId, streamId);
             streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
 
-            // 2. Delete file data source, DB data source information
-            sourceFileService.deleteAllByIdentifier(groupId, streamId);
-            sourceDbService.deleteAllByIdentifier(groupId, streamId);
+            // 2. Delete all stream source
+            sourceService.deleteAll(groupId, streamId, operator);
 
-            // 3. Delete data sink information
+            // 3. Delete all stream sink
             sinkService.deleteAll(groupId, streamId, operator);
 
             // 4. Save the inlong stream of this batch
@@ -477,16 +443,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<InlongStreamEntity> page = (Page<InlongStreamEntity>) streamMapper.selectByCondition(request);
-        List<InlongStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamInfo::new);
+        List<InlongStreamResponse> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamResponse::new);
 
         // Convert and encapsulate the paged results
         List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
-        for (InlongStreamInfo streamInfo : streamInfoList) {
-            // 2.1 Set the extended information and field information of the inlong stream
+        for (InlongStreamResponse streamInfo : streamInfoList) {
+            // 2Set the field information of the inlong stream
             String streamId = streamInfo.getInlongStreamId();
-            setStreamExtAndField(groupId, streamId, streamInfo);
+            List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
+            streamInfo.setFieldList(streamFields);
 
-            // 2.3 Set the inlong stream to the result sub-object
             FullStreamResponse pageInfo = new FullStreamResponse();
             pageInfo.setStreamInfo(streamInfo);
 
@@ -511,28 +477,11 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean updateAll(FullPageUpdateRequest updateInfo, String operator) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to update all stream page info: {}", updateInfo);
-        }
-        Preconditions.checkNotNull(updateInfo, "updateInfo is empty");
-        Preconditions.checkNotNull(updateInfo.getStreamInfo(), "inlong stream info is empty");
-
-        // 1. Modify the inlong stream (inlong stream information cannot be empty)
-        this.update(updateInfo.getStreamInfo(), operator);
-
-        // 2. Modify the basic information of the file data source
-        if (updateInfo.getFileBasicInfo() != null) {
-            sourceFileService.updateBasic(updateInfo.getFileBasicInfo(), operator);
-        }
-
-        // 3. Save the basic information of the DB data source
-        if (updateInfo.getDbBasicInfo() != null) {
-            sourceDbService.updateBasic(updateInfo.getDbBasicInfo(), operator);
-        }
-
-        // TODO Update stream source info
+    public boolean updateAll(InlongStreamRequest request, String operator) {
+        LOGGER.info("begin to update all stream page info: " + request);
+        Preconditions.checkNotNull(request, "request is empty");
 
+        this.update(request, operator);
         LOGGER.info("success to update all stream page info");
         return true;
     }
@@ -720,16 +669,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
      *
      * @param groupStatus Inlong group status
      * @param streamEntity Original inlong stream entity
-     * @param streamInfo New inlong stream information
+     * @param request New inlong stream information
      */
-    private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamInfo streamInfo) {
-        if (streamEntity == null || streamInfo == null) {
+    private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
+        if (streamEntity == null || request == null) {
             return;
         }
 
         // Fields that are not allowed to be modified when the inlong group [configuration is successful]
         if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupStatus)) {
-            checkUpdatedFields(streamEntity, streamInfo);
+            checkUpdatedFields(streamEntity, request);
         }
 
         // Inlong group [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
@@ -739,14 +688,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
                 EntityStatus.GROUP_APPROVE_REJECTED.getCode(),
                 EntityStatus.GROUP_CONFIG_FAILED.getCode());
         if (statusList.contains(groupStatus)) {
-            String groupId = streamInfo.getInlongGroupId();
-            String streamId = streamInfo.getInlongStreamId();
-            // Whether there is an undeleted data source
-            boolean dataSourceExist = hasDataSource(groupId, streamId, streamInfo.getDataSourceType());
-            // Whether there is undeleted stream sink
+            String groupId = request.getInlongGroupId();
+            String streamId = request.getInlongStreamId();
+            // Whether there is undeleted stream source and sink
+            int sourceCount = sourceService.getCount(groupId, streamId);
             int sinkCount = sinkService.getCount(groupId, streamId);
-            if (dataSourceExist || sinkCount > 0) {
-                checkUpdatedFields(streamEntity, streamInfo);
+            if (sourceCount > 0 || sinkCount > 0) {
+                checkUpdatedFields(streamEntity, request);
             }
         }
     }
@@ -754,24 +702,18 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     /**
      * Check that groupId, streamId, and dataSourceType are not allowed to be modified
      */
-    private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamInfo streamInfo) {
-        String newGroupId = streamInfo.getInlongGroupId();
+    private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamRequest request) {
+        String newGroupId = request.getInlongGroupId();
         if (newGroupId != null && !newGroupId.equals(streamEntity.getInlongGroupId())) {
             LOGGER.error("current status was not allowed to update inlong group id");
             throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
         }
 
-        String newDsid = streamInfo.getInlongStreamId();
-        if (newDsid != null && !newDsid.equals(streamEntity.getInlongStreamId())) {
+        String newStreamId = request.getInlongStreamId();
+        if (newStreamId != null && !newStreamId.equals(streamEntity.getInlongStreamId())) {
             LOGGER.error("current status was not allowed to update inlong stream id");
             throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
         }
-
-        String newSourceType = streamInfo.getDataSourceType();
-        if (newSourceType != null && !newSourceType.equals(streamEntity.getDataSourceType())) {
-            LOGGER.error("current status was not allowed to update data source type");
-            throw new BusinessException(ErrorCodeEnum.STREAM_SOURCE_UPDATE_NOT_ALLOWED);
-        }
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 16ccb9d..0d5f45b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -125,7 +125,7 @@ public class PushSortConfigListener implements SortOperateListener {
             List<StreamSinkFieldEntity> fieldList) {
         DeserializationInfo deserializationInfo = null;
         String groupId = groupInfo.getInlongGroupId();
-        InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+        InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
 
         boolean isDbType = Constant.DATA_SOURCE_DB.equals(streamInfo.getDataType());
         if (!isDbType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index cea3110..a2211b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
@@ -48,7 +48,7 @@ public class SerializationUtils {
      * Create deserialization info
      */
     public static DeserializationInfo createDeserialInfo(SourceResponse sourceResponse,
-            InlongStreamInfo streamInfo) {
+            InlongStreamResponse streamInfo) {
         SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
         switch (sourceType) {
             case BINLOG:
@@ -87,7 +87,7 @@ public class SerializationUtils {
     /**
      * Get deserialization info for Kafka
      */
-    private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamInfo stream) {
+    private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamResponse stream) {
         String serializationType = source.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
@@ -135,7 +135,8 @@ public class SerializationUtils {
     /**
      * Get deserialization info for File
      */
-    private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
+    private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse,
+            InlongStreamResponse streamInfo) {
         String serializationType = sourceResponse.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 13a74c8..66838ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -61,7 +61,7 @@ public class SourceInfoUtils {
      * Create source info for DataFlowInfo.
      */
     public static SourceInfo createSourceInfo(PulsarClusterInfo pulsarCluster, String masterAddress,
-            ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamResponse streamInfo,
             SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
 
         String mqType = groupInfo.getMiddlewareType();
@@ -84,7 +84,7 @@ public class SourceInfoUtils {
      * Create source info for Pulsar
      */
     private static SourceInfo createPulsarSourceInfo(PulsarClusterInfo pulsarCluster, ClusterBean clusterBean,
-            InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            InlongGroupInfo groupInfo, InlongStreamResponse streamInfo,
             DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
         String topicName = streamInfo.getMqResourceObj();
         InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
index b693a65..37b5e30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -69,9 +69,9 @@ public class StartCreateGroupProcessListener implements ProcessEventListener {
         processForm.setGroupInfo(groupService.get(groupId));
         String username = context.getApplicant();
         List<InlongStreamEntity> inlongStreamEntityList = streamMapper.selectByGroupId(groupId);
-        List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
-                InlongStreamInfo::new);
-        processForm.setInlongStreamInfoList(inlongStreamInfoList);
+        List<InlongStreamResponse> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+                InlongStreamResponse::new);
+        processForm.setStreamList(streamList);
         workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
index 5eb136c..88b98a8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.junit.Assert;
 import org.junit.Test;
@@ -44,11 +45,11 @@ public class InlongStreamServiceTest {
      * Test save inlong stream
      */
     public Integer saveInlongStream(String groupId, String streamId, String operator) {
-        InlongStreamInfo streamInfo;
+        ;
         try {
-            streamInfo = streamService.get(groupId, streamId);
-            if (streamInfo != null) {
-                return streamInfo.getId();
+            InlongStreamResponse response = streamService.get(groupId, streamId);
+            if (response != null) {
+                return response.getId();
             }
         } catch (Exception e) {
             // ignore
@@ -56,12 +57,12 @@ public class InlongStreamServiceTest {
 
         groupServiceTest.saveGroup(globalGroupName, operator);
 
-        streamInfo = new InlongStreamInfo();
-        streamInfo.setInlongGroupId(groupId);
-        streamInfo.setInlongStreamId(streamId);
-        streamInfo.setDataEncoding("UTF-8");
+        InlongStreamRequest request = new InlongStreamRequest();
+        request.setInlongGroupId(groupId);
+        request.setInlongStreamId(streamId);
+        request.setDataEncoding("UTF-8");
 
-        return streamService.save(streamInfo, operator);
+        return streamService.save(request, operator);
     }
 
     @Test
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 539f256..367baed 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
@@ -51,10 +51,10 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
     private StreamSourceService streamSourceService;
 
     public Integer createBinlogSource(InlongGroupInfo groupInfo) {
-        final InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+        final InlongStreamResponse stream = createStreamInfo(groupInfo);
         BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
-        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
-        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setInlongGroupId(stream.getInlongGroupId());
+        sourceRequest.setInlongStreamId(stream.getInlongStreamId());
         sourceRequest.setSourceName("binlog-collect");
         return streamSourceService.save(sourceRequest, OPERATOR);
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
index cdf2ffd..653a56b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -63,7 +63,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
     @Autowired
     protected StreamSourceService streamSourceService;
 
-    public HiveSinkRequest createHiveSink(InlongStreamInfo streamInfo) {
+    public HiveSinkRequest createHiveSink(InlongStreamResponse streamInfo) {
         HiveSinkRequest hiveSinkRequest = new HiveSinkRequest();
         hiveSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         hiveSinkRequest.setSinkType("HIVE");
@@ -95,7 +95,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
         return hiveSinkRequest;
     }
 
-    public KafkaSourceRequest createKafkaSource(InlongStreamInfo streamInfo) {
+    public KafkaSourceRequest createKafkaSource(InlongStreamResponse streamInfo) {
         KafkaSourceRequest kafkaSourceRequest = new KafkaSourceRequest();
         kafkaSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         kafkaSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
@@ -112,7 +112,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
         groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
         groupInfo.setZookeeperEnabled(0);
         groupService.update(groupInfo.genRequest(), OPERATOR);
-        InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+        InlongStreamResponse streamInfo = createStreamInfo(groupInfo);
         createHiveSink(streamInfo);
         createKafkaSource(streamInfo);
         mockTaskListenerFactory();
@@ -139,7 +139,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
         groupInfo.setZookeeperEnabled(0);
         groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
         groupService.update(groupInfo.genRequest(), OPERATOR);
-        InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+        InlongStreamResponse streamInfo = createStreamInfo(groupInfo);
         createHiveSink(streamInfo);
         createKafkaSource(streamInfo);
         UpdateGroupProcessForm form = new UpdateGroupProcessForm();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 1f25978..9cbb07a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -25,7 +25,8 @@ import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskExecuteLogQuery;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -148,7 +149,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
     /**
      * Create inlong stream
      */
-    public InlongStreamInfo createStreamInfo(InlongGroupInfo inlongGroupInfo) {
+    public InlongStreamResponse createStreamInfo(InlongGroupInfo groupInfo) {
         // delete first
         try {
             streamService.delete(GROUP_ID, OPERATOR, OPERATOR);
@@ -156,22 +157,24 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
             // ignore
         }
 
-        InlongStreamInfo streamInfo = new InlongStreamInfo();
-        streamInfo.setInlongGroupId(inlongGroupInfo.getInlongGroupId());
-        streamInfo.setInlongStreamId(STREAM_ID);
-        streamInfo.setMqResourceObj(STREAM_ID);
-        streamInfo.setDataSeparator("124");
-        streamInfo.setDataEncoding(DATA_ENCODING);
-        streamInfo.setInCharges(OPERATOR);
-        streamInfo.setCreator(OPERATOR);
-        streamInfo.setFieldList(createStreamFields(inlongGroupInfo.getInlongGroupId(), STREAM_ID));
-        streamService.save(streamInfo, OPERATOR);
-        return streamInfo;
+        InlongStreamRequest request = new InlongStreamRequest();
+        request.setInlongGroupId(groupInfo.getInlongGroupId());
+        request.setInlongStreamId(STREAM_ID);
+        request.setMqResourceObj(STREAM_ID);
+        request.setDataSeparator("124");
+        request.setDataEncoding(DATA_ENCODING);
+        request.setInCharges(OPERATOR);
+        request.setFieldList(createStreamFields(groupInfo.getInlongGroupId(), STREAM_ID));
+        streamService.save(request, OPERATOR);
+
+        return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
     }
 
-    public List<InlongStreamFieldInfo> createStreamFields(String inlongGroupId, String inlongStreamId) {
+    public List<InlongStreamFieldInfo> createStreamFields(String groupId, String streamId) {
         final List<InlongStreamFieldInfo> streamFieldInfos = new ArrayList<>();
         InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
+        fieldInfo.setInlongGroupId(groupId);
+        fieldInfo.setInlongStreamId(streamId);
         fieldInfo.setFieldName("id");
         fieldInfo.setFieldType("int");
         fieldInfo.setFieldComment("idx");
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 143a441..1454268 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -24,12 +24,12 @@ import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -58,8 +58,8 @@ public class InlongStreamController {
     @RequestMapping(value = "/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save inlong stream info")
-    public Response<Integer> save(@RequestBody InlongStreamInfo streamInfo) {
-        int result = streamService.save(streamInfo, LoginUserUtils.getLoginUserDetail().getUserName());
+    public Response<Integer> save(@RequestBody InlongStreamRequest request) {
+        int result = streamService.save(request, LoginUserUtils.getLoginUserDetail().getUserName());
         return Response.success(result);
     }
 
@@ -84,7 +84,7 @@ public class InlongStreamController {
             @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
     })
-    public Response<InlongStreamInfo> get(@RequestParam String groupId, @RequestParam String streamId) {
+    public Response<InlongStreamResponse> get(@RequestParam String groupId, @RequestParam String streamId) {
         return Response.success(streamService.get(groupId, streamId));
     }
 
@@ -105,16 +105,16 @@ public class InlongStreamController {
     @RequestMapping(value = "/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update inlong stream info")
-    public Response<Boolean> update(@RequestBody InlongStreamInfo streamInfo) {
+    public Response<Boolean> update(@RequestBody InlongStreamRequest request) {
         String username = LoginUserUtils.getLoginUserDetail().getUserName();
-        return Response.success(streamService.update(streamInfo, username));
+        return Response.success(streamService.update(request, username));
     }
 
     @RequestMapping(value = "/updateAll", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update inlong stream page info")
-    public Response<Boolean> updateAll(@RequestBody FullPageUpdateRequest updateInfo) {
-        boolean result = streamService.updateAll(updateInfo, LoginUserUtils.getLoginUserDetail().getUserName());
+    public Response<Boolean> updateAll(@RequestBody InlongStreamRequest request) {
+        boolean result = streamService.updateAll(request, LoginUserUtils.getLoginUserDetail().getUserName());
         return Response.success(result);
     }
 
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index aaf3e7d..ac33d7e 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -116,6 +116,10 @@
                     <artifactId>android-json</artifactId>
                     <groupId>com.vaadin.external.google</groupId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.jayway.jsonpath</groupId>
+                    <artifactId>json-path</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -448,6 +452,7 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-test</artifactId>
                 <version>${spring-boot.version}</version>
+                <scope>test</scope>
             </dependency>
             <dependency>
                 <groupId>org.springframework</groupId>