You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/27 02:33:26 UTC
[inlong] branch master updated: [INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1363c112f [INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)
1363c112f is described below
commit 1363c112f8344ca7a65bdb1ff08023a412c8744b
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 27 10:33:20 2022 +0800
[INLONG-4688][Manager] Remove unused FullStream-related classes and APIs (#4776)
---
.../inlong/manager/client/ut/Kafka2HiveTest.java | 199 ++++++++++-----------
.../inlong/manager/client/cli/DescribeCommand.java | 6 +-
.../inlong/manager/client/cli/ListCommand.java | 9 +-
.../manager/client/api/impl/InlongGroupImpl.java | 9 +-
.../manager/client/api/impl/InlongStreamImpl.java | 17 +-
.../client/api/inner/InnerInlongManagerClient.java | 9 +-
.../client/api/service/InlongStreamApi.java | 3 +-
.../client/api/impl/InlongGroupImplTest.java | 3 +
.../client/api/impl/InlongStreamImplTest.java | 2 +-
.../api/inner/InnerInlongManagerClientTest.java | 153 ++++++++--------
.../common/pojo/stream/FullStreamRequest.java | 44 -----
.../common/pojo/stream/FullStreamResponse.java | 50 ------
.../common/pojo/stream/InlongStreamInfo.java | 11 ++
.../common/pojo/stream/InlongStreamResponse.java | 2 +
.../manager/service/core/InlongStreamService.java | 24 +--
.../service/core/impl/InlongStreamServiceImpl.java | 87 +--------
.../web/controller/InlongStreamController.java | 24 +--
17 files changed, 221 insertions(+), 431 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index ac804b901..b30356ec2 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -243,90 +242,90 @@ class Kafka2HiveTest extends BaseTest {
)
);
- Response<PageInfo<FullStreamResponse>> fullStreamResponsePage = Response.success(
- new PageInfo<>(
- Lists.newArrayList(FullStreamResponse.builder()
- .streamInfo(InlongStreamInfo.builder()
- .id(8)
- .inlongGroupId(GROUP_ID)
- .inlongStreamId(STREAM_ID)
- .name(STREAM_ID)
- .mqResource("test_topic")
- .dataEncoding("UTF-8")
- .dataSeparator("|")
- .syncSend(1)
- .dailyRecords(10)
- .dailyStorage(10)
- .peakRecords(1000)
- .maxLength(10240)
- .storagePeriod(1)
- .status(120)
- .creator("admin")
- .modifier("admin")
- .createTime(new Date())
- .modifyTime(new Date())
- .fieldList(createStreamFields())
+ InlongStreamInfo streamInfo = InlongStreamInfo.builder()
+ .id(8)
+ .inlongGroupId(GROUP_ID)
+ .inlongStreamId(STREAM_ID)
+ .name(STREAM_ID)
+ .mqResource("test_topic")
+ .dataEncoding("UTF-8")
+ .dataSeparator("|")
+ .syncSend(1)
+ .dailyRecords(10)
+ .dailyStorage(10)
+ .peakRecords(1000)
+ .maxLength(10240)
+ .storagePeriod(1)
+ .status(120)
+ .creator("admin")
+ .modifier("admin")
+ .createTime(new Date())
+ .modifyTime(new Date())
+ .fieldList(createStreamFields())
+ .build();
+
+ ArrayList<KafkaSource> kafkaSources = Lists.newArrayList(
+ KafkaSource.builder()
+ .id(6)
+ .topic(TOPIC)
+ .bootstrapServers("{kafka.bootstrap}")
+ .inlongGroupId(GROUP_ID)
+ .inlongStreamId(STREAM_ID)
+ .sourceType("KAFKA")
+ .sourceName("{kafka.source.name}")
+ .serializationType("json")
+ .version(1)
+ .status(110)
+ .creator("admin")
+ .modifier("admin")
+ .createTime(new Date())
+ .modifyTime(new Date())
+ .build()
+ );
+
+ ArrayList<HiveSink> hiveSinks = Lists.newArrayList(
+ HiveSink.builder()
+ .id(6)
+ .inlongStreamId(STREAM_ID)
+ .inlongGroupId(GROUP_ID)
+ .jdbcUrl("jdbc:hive2://{ip:port}")
+ .dbName("test_db")
+ .tableName("test_table")
+ .dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
+ .fileFormat("TextFile")
+ .dataEncoding("UTF-8")
+ .dataSeparator("|")
+ .sinkType("HIVE")
+ .sinkName("sink_name")
+ .enableCreateResource(1)
+ .status(110)
+ .creator("admin")
+ .modifier("admin")
+ .dataFormat(DataFormat.NONE)
+ .sinkFieldList(Lists.newArrayList(
+ SinkField.builder()
+ .id(17)
+ .fieldName("age")
+ .fieldType("INT")
+ .fieldComment("age")
+ .sourceFieldName("age")
+ .sourceFieldType("INT")
+ .build(),
+ SinkField.builder()
+ .id(18)
+ .fieldName("name")
+ .fieldType("STRING")
+ .fieldComment("name")
+ .sourceFieldName("name")
+ .sourceFieldType("STRING")
.build()
- )
- .sourceInfo(Lists.newArrayList(
- KafkaSource.builder()
- .id(6)
- .topic(TOPIC)
- .bootstrapServers("{kafka.bootstrap}")
- .inlongGroupId(GROUP_ID)
- .inlongStreamId(STREAM_ID)
- .sourceType("KAFKA")
- .sourceName("{kafka.source.name}")
- .serializationType("json")
- .version(1)
- .status(110)
- .creator("admin")
- .modifier("admin")
- .createTime(new Date())
- .modifyTime(new Date())
- .build()
- ))
- .sinkInfo(Lists.newArrayList(
- HiveSink.builder()
- .id(6)
- .inlongStreamId(STREAM_ID)
- .inlongGroupId(GROUP_ID)
- .jdbcUrl("jdbc:hive2://{ip:port}")
- .dbName("test_db")
- .tableName("test_table")
- .dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
- .fileFormat("TextFile")
- .dataEncoding("UTF-8")
- .dataSeparator("|")
- .sinkType("HIVE")
- .sinkName("sink_name")
- .enableCreateResource(1)
- .status(110)
- .creator("admin")
- .modifier("admin")
- .dataFormat(DataFormat.NONE)
- .sinkFieldList(Lists.newArrayList(
- SinkField.builder()
- .id(17)
- .fieldName("age")
- .fieldType("INT")
- .fieldComment("age")
- .sourceFieldName("age")
- .sourceFieldType("INT")
- .build(),
- SinkField.builder()
- .id(18)
- .fieldName("name")
- .fieldType("STRING")
- .fieldComment("name")
- .sourceFieldName("name")
- .sourceFieldType("STRING")
- .build()
- ))
- .build()
- ))
- .build())
- )
+ ))
+ .build());
+ streamInfo.setSourceList(kafkaSources);
+ streamInfo.setSinkList(hiveSinks);
+
+ Response<PageInfo<InlongStreamInfo>> fullStreamResponsePage = Response.success(
+ new PageInfo<>(Lists.newArrayList(streamInfo))
);
stubFor(
post(urlMatching(MANAGER_URL_PREFIX + "/stream/listAll.*"))
@@ -382,22 +381,6 @@ class Kafka2HiveTest extends BaseTest {
);
}
- @Test
- void testCreateGroupForHive() {
- Assertions.assertDoesNotThrow(() -> {
- InlongGroup group = inlongClient.forGroup(groupInfo);
- InlongStreamBuilder streamBuilder = group.createStream(createStreamInfo());
- streamBuilder.fields(createStreamFields());
- streamBuilder.source(createKafkaSource());
- streamBuilder.sink(createHiveSink());
- streamBuilder.initOrUpdate();
- // start group
- InlongGroupContext inlongGroupContext = group.init();
- Assertions.assertNotNull(inlongGroupContext);
- });
-
- }
-
private static KafkaSource createKafkaSource() {
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setBootstrapServers("127.0.0.1");
@@ -413,4 +396,20 @@ class Kafka2HiveTest extends BaseTest {
new StreamField(1, FieldType.INT.toString(), "age", null, null)
);
}
+
+ @Test
+ void testCreateGroupForHive() {
+ Assertions.assertDoesNotThrow(() -> {
+ InlongGroup group = inlongClient.forGroup(groupInfo);
+ InlongStreamBuilder streamBuilder = group.createStream(createStreamInfo());
+ streamBuilder.fields(createStreamFields());
+ streamBuilder.source(createKafkaSource());
+ streamBuilder.sink(createHiveSink());
+ streamBuilder.initOrUpdate();
+ // start group
+ InlongGroupContext inlongGroupContext = group.init();
+ Assertions.assertNotNull(inlongGroupContext);
+ });
+
+ }
}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
index c81a4f695..f8032cc6d 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import java.io.IOException;
import java.util.List;
@@ -79,8 +79,8 @@ public class DescribeCommand extends AbstractCommand {
@Override
void run() {
try {
- List<FullStreamResponse> fullStreamResponseList = managerClient.listStreamInfo(groupId);
- fullStreamResponseList.forEach(response -> PrintUtils.printJson(response.getStreamInfo()));
+ List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+ streamInfos.forEach(PrintUtils::printJson);
} catch (Exception e) {
System.out.println(e.getMessage());
}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index 2a8a46291..def185ff7 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -33,12 +33,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Get main information of resources.
@@ -86,11 +84,8 @@ public class ListCommand extends AbstractCommand {
@Override
void run() {
try {
- List<FullStreamResponse> streamResponseList = managerClient.listStreamInfo(groupId);
- List<InlongStreamInfo> streamInfoList = streamResponseList.stream()
- .map(FullStreamResponse::getStreamInfo)
- .collect(Collectors.toList());
- PrintUtils.print(streamInfoList, StreamInfo.class);
+ List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+ PrintUtils.print(streamInfos, StreamInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 3e7447f2c..521ea0d19 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -305,12 +304,12 @@ public class InlongGroupImpl implements InlongGroup {
}
private List<InlongStream> fetchInlongStreams(String groupId) {
- List<FullStreamResponse> streamResponses = managerClient.listStreamInfo(groupId);
- if (CollectionUtils.isEmpty(streamResponses)) {
+ List<InlongStreamInfo> streamInfos = managerClient.listStreamInfo(groupId);
+ if (CollectionUtils.isEmpty(streamInfos)) {
return null;
}
- return streamResponses.stream()
- .map(response -> new InlongStreamImpl(response, managerClient))
+ return streamInfos.stream()
+ .map(streamInfo -> new InlongStreamImpl(streamInfo, managerClient))
.collect(Collectors.toList());
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 56b636666..870810301 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelation;
@@ -70,8 +69,7 @@ public class InlongStreamImpl implements InlongStream {
/**
* Constructor of InlongStreamImpl.
*/
- public InlongStreamImpl(FullStreamResponse streamResponse, InnerInlongManagerClient managerClient) {
- InlongStreamInfo streamInfo = streamResponse.getStreamInfo();
+ public InlongStreamImpl(InlongStreamInfo streamInfo, InnerInlongManagerClient managerClient) {
this.managerClient = managerClient;
this.inlongGroupId = streamInfo.getInlongGroupId();
this.inlongStreamId = streamInfo.getInlongStreamId();
@@ -90,18 +88,19 @@ public class InlongStreamImpl implements InlongStream {
)
).collect(Collectors.toList());
}
- List<StreamSink> responseList = streamResponse.getSinkInfo();
- if (CollectionUtils.isNotEmpty(responseList)) {
- this.streamSinks = responseList.stream()
+
+ List<? extends StreamSink> sinkInfos = streamInfo.getSinkList();
+ if (CollectionUtils.isNotEmpty(sinkInfos)) {
+ this.streamSinks = sinkInfos.stream()
.collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink,
(sink1, sink2) -> {
throw new RuntimeException(String.format("duplicate sinkName:%s in stream:%s",
sink1.getSinkName(), this.inlongStreamId));
}));
}
- List<StreamSource> sourceList = streamResponse.getSourceInfo();
- if (CollectionUtils.isNotEmpty(sourceList)) {
- this.streamSources = sourceList.stream()
+ List<? extends StreamSource> sourceInfos = streamInfo.getSourceList();
+ if (CollectionUtils.isNotEmpty(sourceInfos)) {
+ this.streamSources = sourceInfos.stream()
.collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource,
(source1, source2) -> {
throw new RuntimeException(String.format("duplicate sourceName: %s in streamId: %s",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index e0f4eeb66..03979ad03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -49,7 +49,6 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
@@ -78,11 +77,9 @@ import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD
@Slf4j
public class InnerInlongManagerClient {
- private final ObjectMapper objectMapper = new ObjectMapper();
-
protected final String host;
protected final int port;
-
+ private final ObjectMapper objectMapper = new ObjectMapper();
private final InlongStreamApi inlongStreamApi;
private final InlongGroupApi inlongGroupApi;
private final StreamSourceApi streamSourceApi;
@@ -288,11 +285,11 @@ public class InnerInlongManagerClient {
/**
* Get information of stream.
*/
- public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
+ public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
pageRequest.setInlongGroupId(inlongGroupId);
- Response<PageInfo<FullStreamResponse>> response = executeHttpCall(inlongStreamApi.listStream(pageRequest));
+ Response<PageInfo<InlongStreamInfo>> response = executeHttpCall(inlongStreamApi.listStream(pageRequest));
assertRespSuccess(response);
return response.getData().getList();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 237ecfd4a..ad32ef1d1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
@@ -46,7 +45,7 @@ public interface InlongStreamApi {
@Query("streamId") String streamId);
@POST("stream/listAll")
- Call<Response<PageInfo<FullStreamResponse>>> listStream(@Body InlongStreamPageRequest request);
+ Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body InlongStreamPageRequest request);
@GET("stream/config/log/list")
Call<Response<PageInfo<InlongStreamConfigLogListResponse>>> getStreamLogs(@Query("inlongGroupId") String groupId,
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
index 5d3d082b8..67f9ba90a 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
@@ -23,6 +23,9 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+/**
+ * Unit test for {@link InlongGroupImpl}
+ */
class InlongGroupImplTest {
@Test
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index f08390c0b..42048155c 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
- * Test class for creat inlong stream.
+ * Unit test for {@link InlongStreamImpl}
*/
public class InlongStreamImplTest {
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
index 915167b8c..82786e7d9 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.es.ElasticsearchSinkListResponse;
@@ -47,6 +48,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSource;
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.file.FileSource;
@@ -55,7 +57,6 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSourceListResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -77,15 +78,14 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
/**
- * Test class for InnerInlongManagerClientTest.
+ * Unit test for {@link InnerInlongManagerClient}.
*/
@Slf4j
class InnerInlongManagerClientTest {
- public static WireMockServer wireMockServer;
- public static InnerInlongManagerClient innerInlongManagerClient;
-
private static final int SERVICE_PORT = 8085;
+ private static WireMockServer wireMockServer;
+ private static InnerInlongManagerClient innerInlongManagerClient;
@BeforeAll
static void setup() {
@@ -487,95 +487,92 @@ class InnerInlongManagerClientTest {
@Test
void testListStream4AllSink() {
- FullStreamResponse fullStreamResponse = FullStreamResponse.builder()
- .streamInfo(
- InlongStreamInfo.builder()
- .id(1)
- .inlongGroupId("11")
- .inlongStreamId("11")
- .fieldList(
- Lists.newArrayList(
- StreamField.builder()
- .id(1)
- .inlongGroupId("123")
- .inlongGroupId("11")
- .build(),
- StreamField.builder()
- .id(2)
- .isMetaField(1)
- .fieldFormat("yyyy-MM-dd HH:mm:ss")
- .build()
- )
- ).build()
- )
- .sourceInfo(
+ InlongStreamInfo streamInfo = InlongStreamInfo.builder()
+ .id(1)
+ .inlongGroupId("11")
+ .inlongStreamId("11")
+ .fieldList(
Lists.newArrayList(
- AutoPushSource.builder()
+ StreamField.builder()
.id(1)
- .inlongStreamId("11")
+ .inlongGroupId("123")
.inlongGroupId("11")
- .sourceType("AUTO_PUSH")
- .createTime(new Date())
-
- .dataProxyGroup("111")
- .build(),
- MySQLBinlogSource.builder()
- .id(2)
- .sourceType("BINLOG")
- .user("user")
- .password("pwd")
- .build(),
- FileSource.builder()
- .id(3)
- .sourceType("FILE")
- .agentIp("127.0.0.1")
- .pattern("pattern")
.build(),
- KafkaSource.builder()
- .id(4)
- .sourceType("KAFKA")
- .autoOffsetReset("11")
- .bootstrapServers("10.110.221.22")
- .build()
- )
- )
- .sinkInfo(
- Lists.newArrayList(
- HiveSink.builder()
- .sinkType("HIVE")
- .id(1)
- .jdbcUrl("127.0.0.1")
- .build(),
- ClickHouseSink.builder()
- .sinkType("CLICKHOUSE")
+ StreamField.builder()
.id(2)
- .flushInterval(11)
- .build(),
- IcebergSink.builder()
- .sinkType("ICEBERG")
- .id(3)
- .dataPath("hdfs://aabb")
- .build(),
- KafkaSink.builder()
- .sinkType("KAFKA")
- .id(4)
- .bootstrapServers("127.0.0.1")
+ .isMetaField(1)
+ .fieldFormat("yyyy-MM-dd HH:mm:ss")
.build()
)
).build();
+ ArrayList<StreamSource> sourceList = Lists.newArrayList(
+ AutoPushSource.builder()
+ .id(1)
+ .inlongStreamId("11")
+ .inlongGroupId("11")
+ .sourceType("AUTO_PUSH")
+ .createTime(new Date())
+
+ .dataProxyGroup("111")
+ .build(),
+ MySQLBinlogSource.builder()
+ .id(2)
+ .sourceType("BINLOG")
+ .user("user")
+ .password("pwd")
+ .build(),
+ FileSource.builder()
+ .id(3)
+ .sourceType("FILE")
+ .agentIp("127.0.0.1")
+ .pattern("pattern")
+ .build(),
+ KafkaSource.builder()
+ .id(4)
+ .sourceType("KAFKA")
+ .autoOffsetReset("11")
+ .bootstrapServers("10.110.221.22")
+ .build()
+ );
+
+ ArrayList<StreamSink> sinkList = Lists.newArrayList(
+ HiveSink.builder()
+ .sinkType("HIVE")
+ .id(1)
+ .jdbcUrl("127.0.0.1")
+ .build(),
+ ClickHouseSink.builder()
+ .sinkType("CLICKHOUSE")
+ .id(2)
+ .flushInterval(11)
+ .build(),
+ IcebergSink.builder()
+ .sinkType("ICEBERG")
+ .id(3)
+ .dataPath("hdfs://aabb")
+ .build(),
+ KafkaSink.builder()
+ .sinkType("KAFKA")
+ .id(4)
+ .bootstrapServers("127.0.0.1")
+ .build()
+ );
+
+ streamInfo.setSourceList(sourceList);
+ streamInfo.setSinkList(sinkList);
+
stubFor(
post(urlMatching("/api/inlong/manager/stream/listAll.*"))
.willReturn(
- okJson(JsonUtils.toJsonString(Response.success(
- new PageInfo<>(Lists.newArrayList(fullStreamResponse))))
+ okJson(JsonUtils.toJsonString(
+ Response.success(new PageInfo<>(Lists.newArrayList(streamInfo))))
)
)
);
- List<FullStreamResponse> fullStreamResponses = innerInlongManagerClient.listStreamInfo("11");
- Assertions.assertEquals(JsonUtils.toJsonString(fullStreamResponse),
- JsonUtils.toJsonString(fullStreamResponses.get(0)));
+ List<InlongStreamInfo> streamInfos = innerInlongManagerClient.listStreamInfo("11");
+ Assertions.assertEquals(JsonUtils.toJsonString(streamInfo), JsonUtils.toJsonString(streamInfos.get(0)));
}
@Test
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
deleted file mode 100644
index 5468f6d5c..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-
-import java.util.List;
-
-/**
- * All request info on the inlong stream page, including inlong stream, source, and stream sink
- */
-@Data
-@ApiModel("All request info on the inlong stream page")
-public class FullStreamRequest {
-
- @ApiModelProperty("Inlong stream info")
- private InlongStreamRequest streamInfo;
-
- @ApiModelProperty("Source info list")
- private List<SourceRequest> sourceInfo;
-
- @ApiModelProperty("Sink info list")
- private List<SinkRequest> sinkInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
deleted file mode 100644
index a480bfcf4..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-
-import java.util.List;
-
-/**
- * All response info on the inlong stream page, including inlong stream, source, and stream sink
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("All response info on the inlong stream page")
-public class FullStreamResponse {
-
- @ApiModelProperty("Inlong stream information")
- private InlongStreamInfo streamInfo;
-
- @ApiModelProperty("Stream source information")
- private List<StreamSource> sourceInfo;
-
- @ApiModelProperty("Stream sink information")
- private List<StreamSink> sinkInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 0606bcfb0..a700d7f42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -24,8 +24,11 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -102,8 +105,10 @@ public class InlongStreamInfo {
@ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
private Integer isDeleted = 0;
+ @ApiModelProperty(value = "Name of creator")
private String creator;
+ @ApiModelProperty(value = "Name of modifier")
private String modifier;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@@ -118,6 +123,12 @@ public class InlongStreamInfo {
@ApiModelProperty(value = "Inlong stream Extension properties")
private List<InlongStreamExtInfo> extList;
+ @ApiModelProperty("Stream source infos")
+ private List<? extends StreamSource> sourceList = new ArrayList<>();
+
+ @ApiModelProperty("Stream sink infos")
+ private List<? extends StreamSink> sinkList = new ArrayList<>();
+
public InlongStreamResponse genResponse() {
return CommonBeanUtils.copyProperties(this, InlongStreamResponse::new);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 34a9bcb6f..1292d0c4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -101,8 +101,10 @@ public class InlongStreamResponse {
@Builder.Default
private Integer isDeleted = 0;
+ @ApiModelProperty(value = "Name of creator")
private String creator;
+ @ApiModelProperty(value = "Name of modifier")
private String modifier;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
index d87f3d43d..09cc8a024 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
@@ -18,8 +18,6 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -116,33 +114,13 @@ public interface InlongStreamService {
*/
List<InlongStreamBriefInfo> getBriefList(String groupId);
- /**
- * Save all information related to the inlong stream, its data source, and stream sink
- *
- * @param fullStreamRequest All information on the page
- * @param operator Edit person's name
- * @return Whether the save was successful
- */
- boolean saveAll(FullStreamRequest fullStreamRequest, String operator);
-
- /**
- * Save inlong streams, their data sources, and all information related to stream sink in batches
- *
- * @param fullStreamRequestList List of inlong stream page information
- * @param operator Edit person's name
- * @return Whether the save was successful
- * @apiNote This interface is only used when creating a new inlong group. To ensure data consistency,
- * all associated data needs to be physically deleted, and then added
- */
- boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator);
-
/**
* Paging query all data of the inlong stream page under the specified groupId
*
* @param request Query
* @return Paging list of all data on the inlong stream page
*/
- PageInfo<FullStreamResponse> listAllWithGroupId(InlongStreamPageRequest request);
+ PageInfo<InlongStreamInfo> listAllWithGroupId(InlongStreamPageRequest request);
/**
* According to the group id, query the number of valid inlong streams belonging to this service
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 7ba385da0..121c25e50 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -28,12 +28,8 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -366,76 +362,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
return briefInfoList;
}
- @Transactional(rollbackFor = Throwable.class)
- @Override
- public boolean saveAll(FullStreamRequest fullStreamRequest, String operator) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
- }
- Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
- InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
- Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
-
- // Save inlong stream
- save(streamRequest, operator);
-
- // Save source info
- if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
- for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
- sourceService.save(source, operator);
- }
- }
-
- // Save sink info
- if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
- for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
- sinkService.save(sinkInfo, operator);
- }
- }
-
- LOGGER.info("success to save all stream page info");
- return true;
- }
-
- @Transactional(rollbackFor = Throwable.class)
@Override
- public boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator) {
- if (CollectionUtils.isEmpty(fullStreamRequestList)) {
- return true;
- }
- LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
-
- // Check if it can be added
- InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
- Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
- String groupId = firstStream.getInlongGroupId();
- this.checkGroupStatusIsTemp(groupId);
-
- // This bulk save is only used when creating or editing inlong group after approval is rejected.
- // To ensure data consistency, you need to physically delete all associated data and then add
- // Note: There may be records with the same groupId and streamId in the historical data,
- // and the ones with is_deleted=0 should be deleted
- streamMapper.deleteAllByGroupId(groupId);
-
- for (FullStreamRequest pageInfo : fullStreamRequestList) {
- // Delete the inlong stream extensions and fields corresponding to groupId and streamId
- InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
- String streamId = streamInfo.getInlongStreamId();
- streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
- streamExtMapper.deleteAllByRelatedId(groupId, streamId);
- // Delete all stream source
- sourceService.deleteAll(groupId, streamId, operator);
- // Delete all stream sink
- sinkService.deleteAll(groupId, streamId, operator);
- // Save the inlong stream of this batch
- this.saveAll(pageInfo, operator);
- }
- LOGGER.info("success to batch save all stream page info");
- return true;
- }
-
- @Override
- public PageInfo<FullStreamResponse> listAllWithGroupId(InlongStreamPageRequest request) {
+ public PageInfo<InlongStreamInfo> listAllWithGroupId(InlongStreamPageRequest request) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to list full inlong stream page by {}", request);
}
@@ -456,7 +384,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
List<InlongStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamInfo::new);
// Convert and encapsulate the paged results
- List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
for (InlongStreamInfo streamInfo : streamInfoList) {
// Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
@@ -466,22 +393,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamExtMapper.selectByRelatedId(groupId, streamId), InlongStreamExtInfo::new);
streamInfo.setExtList(streamExtInfos);
- FullStreamResponse pageInfo = new FullStreamResponse();
- pageInfo.setStreamInfo(streamInfo);
-
// Query stream sources information
List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
- pageInfo.setSourceInfo(sourceList);
+ streamInfo.setSourceList(sourceList);
// Query various stream sinks and its extended information, field information
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
- pageInfo.setSinkInfo(sinkList);
-
- // Add a single result to the paginated list
- responseList.add(pageInfo);
+ streamInfo.setSinkList(sinkList);
}
- PageInfo<FullStreamResponse> pageInfo = new PageInfo<>(responseList);
+ PageInfo<InlongStreamInfo> pageInfo = new PageInfo<>(streamInfoList);
pageInfo.setTotal(pageInfo.getTotal());
LOGGER.debug("success to list full inlong stream info");
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 38fbb26cd..3ba86cd4e 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -24,13 +24,12 @@ import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -67,21 +66,6 @@ public class InlongStreamController {
return Response.success(result);
}
- @RequestMapping(value = "/saveAll", method = RequestMethod.POST)
- @OperationLog(operation = OperationType.CREATE)
- @ApiOperation(value = "Save inlong stream info page")
- public Response<Boolean> saveAll(@RequestBody FullStreamRequest pageInfo) {
- return Response.success(streamService.saveAll(pageInfo, LoginUserUtils.getLoginUserDetail().getUserName()));
- }
-
- @RequestMapping(value = "/batchSaveAll", method = RequestMethod.POST)
- @OperationLog(operation = OperationType.CREATE)
- @ApiOperation(value = "Save inlong stream page info in batch")
- public Response<Boolean> batchSaveAll(@RequestBody List<FullStreamRequest> infoList) {
- boolean result = streamService.batchSaveAll(infoList, LoginUserUtils.getLoginUserDetail().getUserName());
- return Response.success(result);
- }
-
@RequestMapping(value = "/exist/{groupId}/{streamId}", method = RequestMethod.GET)
@ApiOperation(value = "Is exists of the inlong stream")
@ApiImplicitParams({
@@ -111,8 +95,8 @@ public class InlongStreamController {
}
@RequestMapping(value = "/listAll", method = RequestMethod.POST)
- @ApiOperation(value = "Get all inlong stream info by paginating")
- public Response<PageInfo<FullStreamResponse>> listAllWithGroupId(@RequestBody InlongStreamPageRequest request) {
+ @ApiOperation(value = "Get inlong stream info by paginating")
+ public Response<PageInfo<InlongStreamInfo>> listAllWithGroupId(@RequestBody InlongStreamPageRequest request) {
request.setCurrentUser(LoginUserUtils.getLoginUserDetail().getUserName());
request.setIsAdminRole(LoginUserUtils.getLoginUserDetail().getRoles().contains(UserRoleCode.ADMIN));
return Response.success(streamService.listAllWithGroupId(request));