You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/27 02:33:26 UTC

[inlong] branch master updated: [INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1363c112f [INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)
1363c112f is described below

commit 1363c112f8344ca7a65bdb1ff08023a412c8744b
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 27 10:33:20 2022 +0800

    [INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)
---
 .../inlong/manager/client/ut/Kafka2HiveTest.java   | 199 ++++++++++-----------
 .../inlong/manager/client/cli/DescribeCommand.java |   6 +-
 .../inlong/manager/client/cli/ListCommand.java     |   9 +-
 .../manager/client/api/impl/InlongGroupImpl.java   |   9 +-
 .../manager/client/api/impl/InlongStreamImpl.java  |  17 +-
 .../client/api/inner/InnerInlongManagerClient.java |   9 +-
 .../client/api/service/InlongStreamApi.java        |   3 +-
 .../client/api/impl/InlongGroupImplTest.java       |   3 +
 .../client/api/impl/InlongStreamImplTest.java      |   2 +-
 .../api/inner/InnerInlongManagerClientTest.java    | 153 ++++++++--------
 .../common/pojo/stream/FullStreamRequest.java      |  44 -----
 .../common/pojo/stream/FullStreamResponse.java     |  50 ------
 .../common/pojo/stream/InlongStreamInfo.java       |  11 ++
 .../common/pojo/stream/InlongStreamResponse.java   |   2 +
 .../manager/service/core/InlongStreamService.java  |  24 +--
 .../service/core/impl/InlongStreamServiceImpl.java |  87 +--------
 .../web/controller/InlongStreamController.java     |  24 +--
 17 files changed, 221 insertions(+), 431 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index ac804b901..b30356ec2 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -243,90 +242,90 @@ class Kafka2HiveTest extends BaseTest {
                         )
         );
 
-        Response<PageInfo<FullStreamResponse>> fullStreamResponsePage = Response.success(
-                new PageInfo<>(
-                        Lists.newArrayList(FullStreamResponse.builder()
-                                .streamInfo(InlongStreamInfo.builder()
-                                        .id(8)
-                                        .inlongGroupId(GROUP_ID)
-                                        .inlongStreamId(STREAM_ID)
-                                        .name(STREAM_ID)
-                                        .mqResource("test_topic")
-                                        .dataEncoding("UTF-8")
-                                        .dataSeparator("|")
-                                        .syncSend(1)
-                                        .dailyRecords(10)
-                                        .dailyStorage(10)
-                                        .peakRecords(1000)
-                                        .maxLength(10240)
-                                        .storagePeriod(1)
-                                        .status(120)
-                                        .creator("admin")
-                                        .modifier("admin")
-                                        .createTime(new Date())
-                                        .modifyTime(new Date())
-                                        .fieldList(createStreamFields())
+        InlongStreamInfo streamInfo = InlongStreamInfo.builder()
+                .id(8)
+                .inlongGroupId(GROUP_ID)
+                .inlongStreamId(STREAM_ID)
+                .name(STREAM_ID)
+                .mqResource("test_topic")
+                .dataEncoding("UTF-8")
+                .dataSeparator("|")
+                .syncSend(1)
+                .dailyRecords(10)
+                .dailyStorage(10)
+                .peakRecords(1000)
+                .maxLength(10240)
+                .storagePeriod(1)
+                .status(120)
+                .creator("admin")
+                .modifier("admin")
+                .createTime(new Date())
+                .modifyTime(new Date())
+                .fieldList(createStreamFields())
+                .build();
+
+        ArrayList<KafkaSource> kafkaSources = Lists.newArrayList(
+                KafkaSource.builder()
+                        .id(6)
+                        .topic(TOPIC)
+                        .bootstrapServers("{kafka.bootstrap}")
+                        .inlongGroupId(GROUP_ID)
+                        .inlongStreamId(STREAM_ID)
+                        .sourceType("KAFKA")
+                        .sourceName("{kafka.source.name}")
+                        .serializationType("json")
+                        .version(1)
+                        .status(110)
+                        .creator("admin")
+                        .modifier("admin")
+                        .createTime(new Date())
+                        .modifyTime(new Date())
+                        .build()
+        );
+
+        ArrayList<HiveSink> hiveSinks = Lists.newArrayList(
+                HiveSink.builder()
+                        .id(6)
+                        .inlongStreamId(STREAM_ID)
+                        .inlongGroupId(GROUP_ID)
+                        .jdbcUrl("jdbc:hive2://{ip:port}")
+                        .dbName("test_db")
+                        .tableName("test_table")
+                        .dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
+                        .fileFormat("TextFile")
+                        .dataEncoding("UTF-8")
+                        .dataSeparator("|")
+                        .sinkType("HIVE")
+                        .sinkName("sink_name")
+                        .enableCreateResource(1)
+                        .status(110)
+                        .creator("admin")
+                        .modifier("admin")
+                        .dataFormat(DataFormat.NONE)
+                        .sinkFieldList(Lists.newArrayList(
+                                SinkField.builder()
+                                        .id(17)
+                                        .fieldName("age")
+                                        .fieldType("INT")
+                                        .fieldComment("age")
+                                        .sourceFieldName("age")
+                                        .sourceFieldType("INT")
+                                        .build(),
+                                SinkField.builder()
+                                        .id(18)
+                                        .fieldName("name")
+                                        .fieldType("STRING")
+                                        .fieldComment("name")
+                                        .sourceFieldName("name")
+                                        .sourceFieldType("STRING")
                                         .build()
-                                )
-                                .sourceInfo(Lists.newArrayList(
-                                        KafkaSource.builder()
-                                                .id(6)
-                                                .topic(TOPIC)
-                                                .bootstrapServers("{kafka.bootstrap}")
-                                                .inlongGroupId(GROUP_ID)
-                                                .inlongStreamId(STREAM_ID)
-                                                .sourceType("KAFKA")
-                                                .sourceName("{kafka.source.name}")
-                                                .serializationType("json")
-                                                .version(1)
-                                                .status(110)
-                                                .creator("admin")
-                                                .modifier("admin")
-                                                .createTime(new Date())
-                                                .modifyTime(new Date())
-                                                .build()
-                                ))
-                                .sinkInfo(Lists.newArrayList(
-                                        HiveSink.builder()
-                                                .id(6)
-                                                .inlongStreamId(STREAM_ID)
-                                                .inlongGroupId(GROUP_ID)
-                                                .jdbcUrl("jdbc:hive2://{ip:port}")
-                                                .dbName("test_db")
-                                                .tableName("test_table")
-                                                .dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
-                                                .fileFormat("TextFile")
-                                                .dataEncoding("UTF-8")
-                                                .dataSeparator("|")
-                                                .sinkType("HIVE")
-                                                .sinkName("sink_name")
-                                                .enableCreateResource(1)
-                                                .status(110)
-                                                .creator("admin")
-                                                .modifier("admin")
-                                                .dataFormat(DataFormat.NONE)
-                                                .sinkFieldList(Lists.newArrayList(
-                                                        SinkField.builder()
-                                                                .id(17)
-                                                                .fieldName("age")
-                                                                .fieldType("INT")
-                                                                .fieldComment("age")
-                                                                .sourceFieldName("age")
-                                                                .sourceFieldType("INT")
-                                                                .build(),
-                                                        SinkField.builder()
-                                                                .id(18)
-                                                                .fieldName("name")
-                                                                .fieldType("STRING")
-                                                                .fieldComment("name")
-                                                                .sourceFieldName("name")
-                                                                .sourceFieldType("STRING")
-                                                                .build()
-                                                ))
-                                                .build()
-                                ))
-                                .build())
-                )
+                        ))
+                        .build());
+        streamInfo.setSourceList(kafkaSources);
+        streamInfo.setSinkList(hiveSinks);
+
+        Response<PageInfo<InlongStreamInfo>> fullStreamResponsePage = Response.success(
+                new PageInfo<>(Lists.newArrayList(streamInfo))
         );
         stubFor(
                 post(urlMatching(MANAGER_URL_PREFIX + "/stream/listAll.*"))
@@ -382,22 +381,6 @@ class Kafka2HiveTest extends BaseTest {
         );
     }
 
-    @Test
-    void testCreateGroupForHive() {
-        Assertions.assertDoesNotThrow(() -> {
-            InlongGroup group = inlongClient.forGroup(groupInfo);
-            InlongStreamBuilder streamBuilder = group.createStream(createStreamInfo());
-            streamBuilder.fields(createStreamFields());
-            streamBuilder.source(createKafkaSource());
-            streamBuilder.sink(createHiveSink());
-            streamBuilder.initOrUpdate();
-            // start group
-            InlongGroupContext inlongGroupContext = group.init();
-            Assertions.assertNotNull(inlongGroupContext);
-        });
-
-    }
-
     private static KafkaSource createKafkaSource() {
         KafkaSource kafkaSource = new KafkaSource();
         kafkaSource.setBootstrapServers("127.0.0.1");
@@ -413,4 +396,20 @@ class Kafka2HiveTest extends BaseTest {
                 new StreamField(1, FieldType.INT.toString(), "age", null, null)
         );
     }
+
+    @Test
+    void testCreateGroupForHive() {
+        Assertions.assertDoesNotThrow(() -> {
+            InlongGroup group = inlongClient.forGroup(groupInfo);
+            InlongStreamBuilder streamBuilder = group.createStream(createStreamInfo());
+            streamBuilder.fields(createStreamFields());
+            streamBuilder.source(createKafkaSource());
+            streamBuilder.sink(createHiveSink());
+            streamBuilder.initOrUpdate();
+            // start group
+            InlongGroupContext inlongGroupContext = group.init();
+            Assertions.assertNotNull(inlongGroupContext);
+        });
+
+    }
 }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
index c81a4f695..f8032cc6d 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
 import java.io.IOException;
 import java.util.List;
@@ -79,8 +79,8 @@ public class DescribeCommand extends AbstractCommand {
         @Override
         void run() {
             try {
-                List<FullStreamResponse> fullStreamResponseList = managerClient.listStreamInfo(groupId);
-                fullStreamResponseList.forEach(response -> PrintUtils.printJson(response.getStreamInfo()));
+                List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+                streamInfos.forEach(PrintUtils::printJson);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
             }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index 2a8a46291..def185ff7 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -33,12 +33,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Get main information of resources.
@@ -86,11 +84,8 @@ public class ListCommand extends AbstractCommand {
         @Override
         void run() {
             try {
-                List<FullStreamResponse> streamResponseList = managerClient.listStreamInfo(groupId);
-                List<InlongStreamInfo> streamInfoList = streamResponseList.stream()
-                        .map(FullStreamResponse::getStreamInfo)
-                        .collect(Collectors.toList());
-                PrintUtils.print(streamInfoList, StreamInfo.class);
+                List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+                PrintUtils.print(streamInfos, StreamInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
             }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 3e7447f2c..521ea0d19 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -305,12 +304,12 @@ public class InlongGroupImpl implements InlongGroup {
     }
 
     private List<InlongStream> fetchInlongStreams(String groupId) {
-        List<FullStreamResponse> streamResponses = managerClient.listStreamInfo(groupId);
-        if (CollectionUtils.isEmpty(streamResponses)) {
+        List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+        if (CollectionUtils.isEmpty(streamInfos)) {
             return null;
         }
-        return streamResponses.stream()
-                .map(response -> new InlongStreamImpl(response, managerClient))
+        return streamInfos.stream()
+                .map(streamInfo -> new InlongStreamImpl(streamInfo, managerClient))
                 .collect(Collectors.toList());
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 56b636666..870810301 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelation;
@@ -70,8 +69,7 @@ public class InlongStreamImpl implements InlongStream {
     /**
      * Constructor of InlongStreamImpl.
      */
-    public InlongStreamImpl(FullStreamResponse streamResponse, InnerInlongManagerClient managerClient) {
-        InlongStreamInfo streamInfo = streamResponse.getStreamInfo();
+    public InlongStreamImpl(InlongStreamInfo streamInfo, InnerInlongManagerClient managerClient) {
         this.managerClient = managerClient;
         this.inlongGroupId = streamInfo.getInlongGroupId();
         this.inlongStreamId = streamInfo.getInlongStreamId();
@@ -90,18 +88,19 @@ public class InlongStreamImpl implements InlongStream {
                             )
                     ).collect(Collectors.toList());
         }
-        List<StreamSink> responseList = streamResponse.getSinkInfo();
-        if (CollectionUtils.isNotEmpty(responseList)) {
-            this.streamSinks = responseList.stream()
+
+        List<? extends StreamSink> sinkInfos = streamInfo.getSinkList();
+        if (CollectionUtils.isNotEmpty(sinkInfos)) {
+            this.streamSinks = sinkInfos.stream()
                     .collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink,
                             (sink1, sink2) -> {
                                 throw new RuntimeException(String.format("duplicate sinkName:%s in stream:%s",
                                         sink1.getSinkName(), this.inlongStreamId));
                             }));
         }
-        List<StreamSource> sourceList = streamResponse.getSourceInfo();
-        if (CollectionUtils.isNotEmpty(sourceList)) {
-            this.streamSources = sourceList.stream()
+        List<? extends StreamSource> sourceInfos = streamInfo.getSourceList();
+        if (CollectionUtils.isNotEmpty(sourceInfos)) {
+            this.streamSources = sourceInfos.stream()
                     .collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource,
                             (source1, source2) -> {
                                 throw new RuntimeException(String.format("duplicate sourceName: %s in streamId: %s",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index e0f4eeb66..03979ad03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -49,7 +49,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
@@ -78,11 +77,9 @@ import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD
 @Slf4j
 public class InnerInlongManagerClient {
 
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
     protected final String host;
     protected final int port;
-
+    private final ObjectMapper objectMapper = new ObjectMapper();
     private final InlongStreamApi inlongStreamApi;
     private final InlongGroupApi inlongGroupApi;
     private final StreamSourceApi streamSourceApi;
@@ -288,11 +285,11 @@ public class InnerInlongManagerClient {
     /**
      * Get information of stream.
      */
-    public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
+    public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
         InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
         pageRequest.setInlongGroupId(inlongGroupId);
 
-        Response<PageInfo<FullStreamResponse>> response = executeHttpCall(inlongStreamApi.listStream(pageRequest));
+        Response<PageInfo<InlongStreamInfo>> response = executeHttpCall(inlongStreamApi.listStream(pageRequest));
         assertRespSuccess(response);
         return response.getData().getList();
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 237ecfd4a..ad32ef1d1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service;
 
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
@@ -46,7 +45,7 @@ public interface InlongStreamApi {
             @Query("streamId") String streamId);
 
     @POST("stream/listAll")
-    Call<Response<PageInfo<FullStreamResponse>>> listStream(@Body InlongStreamPageRequest request);
+    Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body InlongStreamPageRequest request);
 
     @GET("stream/config/log/list")
     Call<Response<PageInfo<InlongStreamConfigLogListResponse>>> getStreamLogs(@Query("inlongGroupId") String groupId,
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
index 5d3d082b8..67f9ba90a 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
@@ -23,6 +23,9 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+/**
+ * Unit test for {@link InlongGroupImpl}
+ */
 class InlongGroupImplTest {
 
     @Test
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index f08390c0b..42048155c 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 /**
- * Test class for creat inlong stream.
+ * Unit test for {@link InlongStreamImpl}
  */
 public class InlongStreamImplTest {
 
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
index 915167b8c..82786e7d9 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.es.ElasticsearchSinkListResponse;
@@ -47,6 +48,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSource;
 import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.file.FileSource;
@@ -55,7 +57,6 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -77,15 +78,14 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
 import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
 
 /**
- * Test class for InnerInlongManagerClientTest.
+ * Unit test for {@link InnerInlongManagerClient}.
  */
 @Slf4j
 class InnerInlongManagerClientTest {
 
-    public static WireMockServer wireMockServer;
-    public static InnerInlongManagerClient innerInlongManagerClient;
-
     private static final int SERVICE_PORT = 8085;
+    private static WireMockServer wireMockServer;
+    private static InnerInlongManagerClient innerInlongManagerClient;
 
     @BeforeAll
     static void setup() {
@@ -487,95 +487,92 @@ class InnerInlongManagerClientTest {
 
     @Test
     void testListStream4AllSink() {
-        FullStreamResponse fullStreamResponse = FullStreamResponse.builder()
-                .streamInfo(
-                        InlongStreamInfo.builder()
-                                .id(1)
-                                .inlongGroupId("11")
-                                .inlongStreamId("11")
-                                .fieldList(
-                                        Lists.newArrayList(
-                                                StreamField.builder()
-                                                        .id(1)
-                                                        .inlongGroupId("123")
-                                                        .inlongGroupId("11")
-                                                        .build(),
-                                                StreamField.builder()
-                                                        .id(2)
-                                                        .isMetaField(1)
-                                                        .fieldFormat("yyyy-MM-dd HH:mm:ss")
-                                                        .build()
-                                        )
-                                ).build()
-                )
-                .sourceInfo(
+        InlongStreamInfo streamInfo = InlongStreamInfo.builder()
+                .id(1)
+                .inlongGroupId("11")
+                .inlongStreamId("11")
+                .fieldList(
                         Lists.newArrayList(
-                                AutoPushSource.builder()
+                                StreamField.builder()
                                         .id(1)
-                                        .inlongStreamId("11")
+                                        .inlongGroupId("123")
                                         .inlongGroupId("11")
-                                        .sourceType("AUTO_PUSH")
-                                        .createTime(new Date())
-
-                                        .dataProxyGroup("111")
-                                        .build(),
-                                MySQLBinlogSource.builder()
-                                        .id(2)
-                                        .sourceType("BINLOG")
-                                        .user("user")
-                                        .password("pwd")
-                                        .build(),
-                                FileSource.builder()
-                                        .id(3)
-                                        .sourceType("FILE")
-                                        .agentIp("127.0.0.1")
-                                        .pattern("pattern")
                                         .build(),
-                                KafkaSource.builder()
-                                        .id(4)
-                                        .sourceType("KAFKA")
-                                        .autoOffsetReset("11")
-                                        .bootstrapServers("10.110.221.22")
-                                        .build()
-                        )
-                )
-                .sinkInfo(
-                        Lists.newArrayList(
-                                HiveSink.builder()
-                                        .sinkType("HIVE")
-                                        .id(1)
-                                        .jdbcUrl("127.0.0.1")
-                                        .build(),
-                                ClickHouseSink.builder()
-                                        .sinkType("CLICKHOUSE")
+                                StreamField.builder()
                                         .id(2)
-                                        .flushInterval(11)
-                                        .build(),
-                                IcebergSink.builder()
-                                        .sinkType("ICEBERG")
-                                        .id(3)
-                                        .dataPath("hdfs://aabb")
-                                        .build(),
-                                KafkaSink.builder()
-                                        .sinkType("KAFKA")
-                                        .id(4)
-                                        .bootstrapServers("127.0.0.1")
+                                        .isMetaField(1)
+                                        .fieldFormat("yyyy-MM-dd HH:mm:ss")
                                         .build()
                         )
                 ).build();
 
+        ArrayList<StreamSource> sourceList = Lists.newArrayList(
+                AutoPushSource.builder()
+                        .id(1)
+                        .inlongStreamId("11")
+                        .inlongGroupId("11")
+                        .sourceType("AUTO_PUSH")
+                        .createTime(new Date())
+
+                        .dataProxyGroup("111")
+                        .build(),
+                MySQLBinlogSource.builder()
+                        .id(2)
+                        .sourceType("BINLOG")
+                        .user("user")
+                        .password("pwd")
+                        .build(),
+                FileSource.builder()
+                        .id(3)
+                        .sourceType("FILE")
+                        .agentIp("127.0.0.1")
+                        .pattern("pattern")
+                        .build(),
+                KafkaSource.builder()
+                        .id(4)
+                        .sourceType("KAFKA")
+                        .autoOffsetReset("11")
+                        .bootstrapServers("10.110.221.22")
+                        .build()
+        );
+
+        ArrayList<StreamSink> sinkList = Lists.newArrayList(
+                HiveSink.builder()
+                        .sinkType("HIVE")
+                        .id(1)
+                        .jdbcUrl("127.0.0.1")
+                        .build(),
+                ClickHouseSink.builder()
+                        .sinkType("CLICKHOUSE")
+                        .id(2)
+                        .flushInterval(11)
+                        .build(),
+                IcebergSink.builder()
+                        .sinkType("ICEBERG")
+                        .id(3)
+                        .dataPath("hdfs://aabb")
+                        .build(),
+                KafkaSink.builder()
+                        .sinkType("KAFKA")
+                        .id(4)
+                        .bootstrapServers("127.0.0.1")
+                        .build()
+        );
+
+        streamInfo.setSourceList(sourceList);
+        streamInfo.setSinkList(sinkList);
+
         stubFor(
                 post(urlMatching("/api/inlong/manager/stream/listAll.*"))
                         .willReturn(
-                                okJson(JsonUtils.toJsonString(Response.success(
-                                        new PageInfo<>(Lists.newArrayList(fullStreamResponse))))
+                                okJson(JsonUtils.toJsonString(
+                                        Response.success(new PageInfo<>(Lists.newArrayList(streamInfo))))
                                 )
                         )
         );
 
-        List<FullStreamResponse> fullStreamResponses = innerInlongManagerClient.listStreamInfo("11");
-        Assertions.assertEquals(JsonUtils.toJsonString(fullStreamResponse),
-                JsonUtils.toJsonString(fullStreamResponses.get(0)));
+        List<InlongStreamInfo> streamInfos = innerInlongManagerClient.listStreamInfo("11");
+        Assertions.assertEquals(JsonUtils.toJsonString(streamInfo), JsonUtils.toJsonString(streamInfos.get(0)));
     }
 
     @Test
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
deleted file mode 100644
index 5468f6d5c..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-
-import java.util.List;
-
-/**
- * All request info on the inlong stream page, including inlong stream, source, and stream sink
- */
-@Data
-@ApiModel("All request info on the inlong stream page")
-public class FullStreamRequest {
-
-    @ApiModelProperty("Inlong stream info")
-    private InlongStreamRequest streamInfo;
-
-    @ApiModelProperty("Source info list")
-    private List<SourceRequest> sourceInfo;
-
-    @ApiModelProperty("Sink info list")
-    private List<SinkRequest> sinkInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
deleted file mode 100644
index a480bfcf4..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-
-import java.util.List;
-
-/**
- * All response info on the inlong stream page, including inlong stream, source, and stream sink
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("All response info on the inlong stream page")
-public class FullStreamResponse {
-
-    @ApiModelProperty("Inlong stream information")
-    private InlongStreamInfo streamInfo;
-
-    @ApiModelProperty("Stream source information")
-    private List<StreamSource> sourceInfo;
-
-    @ApiModelProperty("Stream sink information")
-    private List<StreamSink> sinkInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 0606bcfb0..a700d7f42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -24,8 +24,11 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -102,8 +105,10 @@ public class InlongStreamInfo {
     @ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
     private Integer isDeleted = 0;
 
+    @ApiModelProperty(value = "Name of creator")
     private String creator;
 
+    @ApiModelProperty(value = "Name of modifier")
     private String modifier;
 
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@@ -118,6 +123,12 @@ public class InlongStreamInfo {
     @ApiModelProperty(value = "Inlong stream Extension properties")
     private List<InlongStreamExtInfo> extList;
 
+    @ApiModelProperty("Stream source infos")
+    private List<? extends StreamSource> sourceList = new ArrayList<>();
+
+    @ApiModelProperty("Stream sink infos")
+    private List<? extends StreamSink> sinkList = new ArrayList<>();
+
     public InlongStreamResponse genResponse() {
         return CommonBeanUtils.copyProperties(this, InlongStreamResponse::new);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 34a9bcb6f..1292d0c4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -101,8 +101,10 @@ public class InlongStreamResponse {
     @Builder.Default
     private Integer isDeleted = 0;
 
+    @ApiModelProperty(value = "Name of creator")
     private String creator;
 
+    @ApiModelProperty(value = "Name of modifier")
     private String modifier;
 
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
index d87f3d43d..09cc8a024 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -116,33 +114,13 @@ public interface InlongStreamService {
      */
     List<InlongStreamBriefInfo> getBriefList(String groupId);
 
-    /**
-     * Save all information related to the inlong stream, its data source, and stream sink
-     *
-     * @param fullStreamRequest All information on the page
-     * @param operator Edit person's name
-     * @return Whether the save was successful
-     */
-    boolean saveAll(FullStreamRequest fullStreamRequest, String operator);
-
-    /**
-     * Save inlong streams, their data sources, and all information related to stream sink in batches
-     *
-     * @param fullStreamRequestList List of inlong stream page information
-     * @param operator Edit person's name
-     * @return Whether the save was successful
-     * @apiNote This interface is only used when creating a new inlong group. To ensure data consistency,
-     *         all associated data needs to be physically deleted, and then added
-     */
-    boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator);
-
     /**
      * Paging query all data of the inlong stream page under the specified groupId
      *
      * @param request Query
      * @return Paging list of all data on the inlong stream page
      */
-    PageInfo<FullStreamResponse> listAllWithGroupId(InlongStreamPageRequest request);
+    PageInfo<InlongStreamInfo> listAllWithGroupId(InlongStreamPageRequest request);
 
     /**
      * According to the group id, query the number of valid inlong streams belonging to this service
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 7ba385da0..121c25e50 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -28,12 +28,8 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -366,76 +362,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         return briefInfoList;
     }
 
-    @Transactional(rollbackFor = Throwable.class)
-    @Override
-    public boolean saveAll(FullStreamRequest fullStreamRequest, String operator) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
-        }
-        Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
-        InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
-        Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
-
-        // Save inlong stream
-        save(streamRequest, operator);
-
-        // Save source info
-        if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
-            for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
-                sourceService.save(source, operator);
-            }
-        }
-
-        // Save sink info
-        if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
-            for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
-                sinkService.save(sinkInfo, operator);
-            }
-        }
-
-        LOGGER.info("success to save all stream page info");
-        return true;
-    }
-
-    @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator) {
-        if (CollectionUtils.isEmpty(fullStreamRequestList)) {
-            return true;
-        }
-        LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
-
-        // Check if it can be added
-        InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
-        Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
-        String groupId = firstStream.getInlongGroupId();
-        this.checkGroupStatusIsTemp(groupId);
-
-        // This bulk save is only used when creating or editing inlong group after approval is rejected.
-        // To ensure data consistency, you need to physically delete all associated data and then add
-        // Note: There may be records with the same groupId and streamId in the historical data,
-        // and the ones with is_deleted=0 should be deleted
-        streamMapper.deleteAllByGroupId(groupId);
-
-        for (FullStreamRequest pageInfo : fullStreamRequestList) {
-            // Delete the inlong stream extensions and fields corresponding to groupId and streamId
-            InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
-            String streamId = streamInfo.getInlongStreamId();
-            streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
-            streamExtMapper.deleteAllByRelatedId(groupId, streamId);
-            //  Delete all stream source
-            sourceService.deleteAll(groupId, streamId, operator);
-            // Delete all stream sink
-            sinkService.deleteAll(groupId, streamId, operator);
-            // Save the inlong stream of this batch
-            this.saveAll(pageInfo, operator);
-        }
-        LOGGER.info("success to batch save all stream page info");
-        return true;
-    }
-
-    @Override
-    public PageInfo<FullStreamResponse> listAllWithGroupId(InlongStreamPageRequest request) {
+    public PageInfo<InlongStreamInfo> listAllWithGroupId(InlongStreamPageRequest request) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to list full inlong stream page by {}", request);
         }
@@ -456,7 +384,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         List<InlongStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamInfo::new);
 
         // Convert and encapsulate the paged results
-        List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
         for (InlongStreamInfo streamInfo : streamInfoList) {
             // Set the field information of the inlong stream
             String streamId = streamInfo.getInlongStreamId();
@@ -466,22 +393,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
                     streamExtMapper.selectByRelatedId(groupId, streamId), InlongStreamExtInfo::new);
             streamInfo.setExtList(streamExtInfos);
 
-            FullStreamResponse pageInfo = new FullStreamResponse();
-            pageInfo.setStreamInfo(streamInfo);
-
             // Query stream sources information
             List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
-            pageInfo.setSourceInfo(sourceList);
+            streamInfo.setSourceList(sourceList);
 
             // Query various stream sinks and its extended information, field information
             List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
-            pageInfo.setSinkInfo(sinkList);
-
-            // Add a single result to the paginated list
-            responseList.add(pageInfo);
+            streamInfo.setSinkList(sinkList);
         }
 
-        PageInfo<FullStreamResponse> pageInfo = new PageInfo<>(responseList);
+        PageInfo<InlongStreamInfo> pageInfo = new PageInfo<>(streamInfoList);
         pageInfo.setTotal(pageInfo.getTotal());
 
         LOGGER.debug("success to list full inlong stream info");
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 38fbb26cd..3ba86cd4e 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -24,13 +24,12 @@ import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -67,21 +66,6 @@ public class InlongStreamController {
         return Response.success(result);
     }
 
-    @RequestMapping(value = "/saveAll", method = RequestMethod.POST)
-    @OperationLog(operation = OperationType.CREATE)
-    @ApiOperation(value = "Save inlong stream info page")
-    public Response<Boolean> saveAll(@RequestBody FullStreamRequest pageInfo) {
-        return Response.success(streamService.saveAll(pageInfo, LoginUserUtils.getLoginUserDetail().getUserName()));
-    }
-
-    @RequestMapping(value = "/batchSaveAll", method = RequestMethod.POST)
-    @OperationLog(operation = OperationType.CREATE)
-    @ApiOperation(value = "Save inlong stream page info in batch")
-    public Response<Boolean> batchSaveAll(@RequestBody List<FullStreamRequest> infoList) {
-        boolean result = streamService.batchSaveAll(infoList, LoginUserUtils.getLoginUserDetail().getUserName());
-        return Response.success(result);
-    }
-
     @RequestMapping(value = "/exist/{groupId}/{streamId}", method = RequestMethod.GET)
     @ApiOperation(value = "Is exists of the inlong stream")
     @ApiImplicitParams({
@@ -111,8 +95,8 @@ public class InlongStreamController {
     }
 
     @RequestMapping(value = "/listAll", method = RequestMethod.POST)
-    @ApiOperation(value = "Get all inlong stream info by paginating")
-    public Response<PageInfo<FullStreamResponse>> listAllWithGroupId(@RequestBody InlongStreamPageRequest request) {
+    @ApiOperation(value = "Get inlong stream info by paginating")
+    public Response<PageInfo<InlongStreamInfo>> listAllWithGroupId(@RequestBody InlongStreamPageRequest request) {
         request.setCurrentUser(LoginUserUtils.getLoginUserDetail().getUserName());
         request.setIsAdminRole(LoginUserUtils.getLoginUserDetail().getRoles().contains(UserRoleCode.ADMIN));
         return Response.success(streamService.listAllWithGroupId(request));