You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/24 02:14:45 UTC
[incubator-inlong] branch master updated: [INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c1225c5 [INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)
c1225c5 is described below
commit c1225c53bd3bdaff0e845c1d4e25d10c57a141c0
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 24 10:14:39 2022 +0800
[INLONG-3300][Manager] Optimize the interface of the inlong-stream page (#3313)
---
.../api/impl/DefaultInlongStreamBuilder.java | 14 +-
.../manager/client/api/impl/InlongClientImpl.java | 12 +-
.../manager/client/api/impl/InlongStreamImpl.java | 4 +-
.../client/api/inner/InnerInlongManagerClient.java | 32 ++-
.../client/api/inner/InnerStreamContext.java | 6 +-
.../manager/client/api/util/InlongParser.java | 6 +-
.../client/api/util/InlongStreamSinkTransfer.java | 10 +-
.../api/util/InlongStreamSourceTransfer.java | 12 +-
.../client/api/util/InlongStreamTransfer.java | 8 +-
.../inlong/manager/common/enums/SourceType.java | 1 +
.../common/pojo/stream/FullPageUpdateRequest.java | 43 ----
.../common/pojo/stream/FullStreamRequest.java | 24 +--
.../common/pojo/stream/FullStreamResponse.java | 18 +-
...ongStreamInfo.java => InlongStreamRequest.java} | 46 +---
...ngStreamInfo.java => InlongStreamResponse.java} | 16 +-
.../workflow/form/GroupResourceProcessForm.java | 4 +-
.../pojo/workflow/form/UpdateGroupProcessForm.java | 4 +-
.../manager/service/CommonOperateService.java | 4 +-
.../manager/service/core/InlongStreamService.java | 33 +--
.../core/impl/InlongGroupProcessOperation.java | 8 +-
.../service/core/impl/InlongStreamServiceImpl.java | 236 ++++++++-------------
.../thirdparty/sort/PushSortConfigListener.java | 4 +-
.../thirdparty/sort/util/SerializationUtils.java | 9 +-
.../thirdparty/sort/util/SourceInfoUtils.java | 6 +-
.../listener/StartCreateGroupProcessListener.java | 8 +-
.../service/core/impl/InlongStreamServiceTest.java | 21 +-
.../source/listener/DataSourceListenerTest.java | 8 +-
.../thirdparty/sort/DisableZkForSortTest.java | 10 +-
.../service/workflow/WorkflowServiceImplTest.java | 31 +--
.../web/controller/InlongStreamController.java | 18 +-
inlong-manager/pom.xml | 5 +
31 files changed, 254 insertions(+), 407 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 9abf6c9..a391f5b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -42,7 +42,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.List;
@@ -68,11 +68,11 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
groupContext.setStreamContextMap(Maps.newHashMap());
}
- InlongStreamInfo streamInfo = InlongStreamTransfer.createStreamInfo(streamConf, groupContext.getGroupInfo());
- InnerStreamContext streamContext = new InnerStreamContext(streamInfo);
+ InlongStreamResponse stream = InlongStreamTransfer.createStreamInfo(streamConf, groupContext.getGroupInfo());
+ InnerStreamContext streamContext = new InnerStreamContext(stream);
groupContext.setStreamContext(streamContext);
this.streamContext = streamContext;
- this.inlongStream = new InlongStreamImpl(streamInfo.getName());
+ this.inlongStream = new InlongStreamImpl(stream.getName());
if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
this.inlongStream.setStreamFields(streamConf.getStreamFields());
}
@@ -107,7 +107,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
@Override
public InlongStream init() {
- InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ InlongStreamResponse streamInfo = streamContext.getStreamInfo();
String streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(Double.valueOf(streamIndex).intValue());
//Create source and update index
@@ -127,8 +127,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
@Override
public InlongStream initOrUpdate() {
- InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
- Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
+ InlongStreamResponse dataStreamInfo = streamContext.getStreamInfo();
+ Pair<Boolean, InlongStreamResponse> existMsg = managerClient.isStreamExists(dataStreamInfo);
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
if (updateMsg.getKey()) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 9be4a19..ac44032 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -49,7 +49,7 @@ public class InlongClientImpl implements InlongClient {
private static final String URL_SPLITTER = ",";
private static final String HOST_SPLITTER = ":";
@Getter
- private ClientConfiguration configuration;
+ private final ClientConfiguration configuration;
public InlongClientImpl(String serviceUrl, ClientConfiguration configuration) {
Map<String, String> hostPorts = Splitter.on(URL_SPLITTER).withKeyValueSeparator(HOST_SPLITTER)
@@ -76,13 +76,12 @@ public class InlongClientImpl implements InlongClient {
}
@Override
- public InlongGroup forGroup(InlongGroupConf groupConf) throws Exception {
+ public InlongGroup forGroup(InlongGroupConf groupConf) {
return new InlongGroupImpl(groupConf, this);
}
@Override
- public List<InlongGroup> listGroup(String expr, int status,
- int pageNum, int pageSize) throws Exception {
+ public List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) {
InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
PageInfo<InlongGroupListResponse> responsePageInfo = managerClient.listGroups(expr, status, pageNum,
pageSize);
@@ -103,7 +102,6 @@ public class InlongClientImpl implements InlongClient {
*
* @param request The request
* @return PageInfo of group
- * @throws Exception
*/
@Override
public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
@@ -112,7 +110,7 @@ public class InlongClientImpl implements InlongClient {
}
@Override
- public InlongGroup getGroup(String groupName) throws Exception {
+ public InlongGroup getGroup(String groupName) {
InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
final String groupId = "b_" + groupName;
InlongGroupResponse groupResponse = managerClient.getGroupInfo(groupId);
@@ -136,7 +134,7 @@ public class InlongClientImpl implements InlongClient {
try {
socket.close();
} catch (IOException e) {
- log.warn("colse connection from {}:{} failed", host, port, e);
+ log.warn("close connection from {}:{} failed", host, port, e);
}
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index a102181..e1b6180 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -35,7 +35,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.List;
import java.util.Map;
@@ -55,7 +55,7 @@ public class InlongStreamImpl extends InlongStream {
private List<StreamField> streamFields = Lists.newArrayList();
public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
- InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
+ InlongStreamResponse streamInfo = fullStreamResponse.getStreamInfo();
this.name = streamInfo.getName();
List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 974fbac..3d298a7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -50,8 +50,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -166,7 +166,7 @@ public class InnerInlongManagerClient {
pageNum = 1;
}
JSONObject groupQuery = new JSONObject();
- groupQuery.put("keyWord", pageNum);
+ groupQuery.put("keyWord", keyword);
groupQuery.put("status", status);
groupQuery.put("pageNum", pageNum);
groupQuery.put("pageSize", pageSize);
@@ -205,10 +205,8 @@ public class InnerInlongManagerClient {
*
* @param inlongGroupRequest The inlongGroupRequest
* @return Response encapsulate of inlong group list
- * @throws Exception may throws exception
*/
- public Response<PageInfo<InlongGroupListResponse>> listGroups(
- InlongGroupPageRequest inlongGroupRequest)
+ public Response<PageInfo<InlongGroupListResponse>> listGroups(InlongGroupPageRequest inlongGroupRequest)
throws Exception {
String requestParams = GsonUtil.toJson(inlongGroupRequest);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestParams);
@@ -284,7 +282,7 @@ public class InnerInlongManagerClient {
}
}
- public String createStreamInfo(InlongStreamInfo streamInfo) {
+ public String createStreamInfo(InlongStreamResponse streamInfo) {
String path = HTTP_PATH + "/stream/save";
final String stream = GsonUtil.toJson(streamInfo);
final RequestBody streamBody = RequestBody.create(MediaType.parse("application/json"), stream);
@@ -309,8 +307,8 @@ public class InnerInlongManagerClient {
}
}
- public Pair<Boolean, InlongStreamInfo> isStreamExists(InlongStreamInfo streamInfo) {
- InlongStreamInfo currentStreamInfo = getStreamInfo(streamInfo);
+ public Pair<Boolean, InlongStreamResponse> isStreamExists(InlongStreamResponse streamInfo) {
+ InlongStreamResponse currentStreamInfo = getStreamInfo(streamInfo);
if (currentStreamInfo != null) {
return Pair.of(true, currentStreamInfo);
} else {
@@ -318,7 +316,7 @@ public class InnerInlongManagerClient {
}
}
- public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) {
+ public Pair<Boolean, String> updateStreamInfo(InlongStreamResponse streamInfo) {
streamInfo.setCreateTime(null);
streamInfo.setModifyTime(null);
final String path = HTTP_PATH + "/stream/update";
@@ -346,7 +344,7 @@ public class InnerInlongManagerClient {
}
}
- public InlongStreamInfo getStreamInfo(InlongStreamInfo streamInfo) {
+ public InlongStreamResponse getStreamInfo(InlongStreamResponse streamInfo) {
String path = HTTP_PATH + "/stream/get";
String url = formatUrl(path);
url += String.format("&groupId=%s&streamId=%s", streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId());
@@ -375,15 +373,15 @@ public class InnerInlongManagerClient {
}
public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
+ InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
+ pageRequest.setInlongGroupId(inlongGroupId);
+ String requestParams = GsonUtil.toJson(pageRequest);
+ RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestParams);
final String path = HTTP_PATH + "/stream/listAll";
- String url = formatUrl(path);
- InlongStreamPageRequest inlongStreamPageRequest = new InlongStreamPageRequest();
- inlongStreamPageRequest.setInlongGroupId(inlongGroupId);
- final String source = GsonUtil.toJson(inlongStreamPageRequest);
- final RequestBody sourceBody = RequestBody.create(MediaType.parse("application/json"), source);
- Request request = new Request.Builder()
+ final String url = formatUrl(path);
+ Request request = new Request.Builder().get()
.url(url)
- .post(sourceBody)
+ .post(requestBody)
.build();
Call call = httpClient.newCall(request);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 5b85758..1dc3d32 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -23,7 +23,7 @@ import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.List;
import java.util.Map;
@@ -32,13 +32,13 @@ import java.util.Map;
@NoArgsConstructor
public class InnerStreamContext {
- private InlongStreamInfo streamInfo;
+ private InlongStreamResponse streamInfo;
private Map<String, SourceRequest> sourceRequests = Maps.newHashMap();
private Map<String, SinkRequest> sinkRequests = Maps.newHashMap();
- public InnerStreamContext(InlongStreamInfo streamInfo) {
+ public InnerStreamContext(InlongStreamResponse streamInfo) {
this.streamInfo = streamInfo;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4a61ca7..c608960 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -46,7 +46,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -103,9 +103,9 @@ public class InlongParser {
}.getType());
}
- public static InlongStreamInfo parseStreamInfo(Response response) {
+ public static InlongStreamResponse parseStreamInfo(Response response) {
Object data = response.getData();
- return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
+ return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamResponse.class);
}
public static List<FullStreamResponse> parseStreamList(Response response) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 28a138c..dccbdfb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import java.nio.charset.Charset;
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
public class InlongStreamSinkTransfer {
- public static SinkRequest createSinkRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+ public static SinkRequest createSinkRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
SinkType sinkType = streamSink.getSinkType();
SinkRequest sinkRequest;
if (sinkType == SinkType.HIVE) {
@@ -84,7 +84,7 @@ public class InlongStreamSinkTransfer {
return streamSinkResult;
}
- private static SinkRequest createClickHouseRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+ private static SinkRequest createClickHouseRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
ClickHouseSinkRequest clickHouseSinkRequest = new ClickHouseSinkRequest();
ClickHouseSink clickHouseSink = (ClickHouseSink) streamSink;
clickHouseSinkRequest.setSinkName(clickHouseSink.getSinkName());
@@ -157,7 +157,7 @@ public class InlongStreamSinkTransfer {
}
- private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+ private static SinkRequest createKafkaRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
KafkaSinkRequest kafkaSinkRequest = new KafkaSinkRequest();
KafkaSink kafkaSink = (KafkaSink) streamSink;
kafkaSinkRequest.setSinkName(streamSink.getSinkName());
@@ -200,7 +200,7 @@ public class InlongStreamSinkTransfer {
return kafkaSink;
}
- private static HiveSinkRequest createHiveRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+ private static HiveSinkRequest createHiveRequest(StreamSink streamSink, InlongStreamResponse streamInfo) {
HiveSinkRequest hiveSinkRequest = new HiveSinkRequest();
HiveSink hiveSink = (HiveSink) streamSink;
hiveSinkRequest.setSinkName(streamSink.getSinkName());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 92a8e7a..f90d995 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -38,7 +38,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.Arrays;
@@ -47,7 +47,7 @@ import java.util.Arrays;
*/
public class InlongStreamSourceTransfer {
- public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
+ public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamResponse streamInfo) {
SourceType sourceType = streamSource.getSourceType();
switch (sourceType) {
case KAFKA:
@@ -176,11 +176,11 @@ public class InlongStreamSourceTransfer {
return binlogSource;
}
- private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
+ private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamResponse stream) {
KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
sourceRequest.setSourceName(kafkaSource.getSourceName());
- sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
- sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setInlongGroupId(stream.getInlongGroupId());
+ sourceRequest.setInlongStreamId(stream.getInlongStreamId());
sourceRequest.setSourceType(kafkaSource.getSourceType().name());
sourceRequest.setAgentIp(kafkaSource.getAgentIp());
sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
@@ -199,7 +199,7 @@ public class InlongStreamSourceTransfer {
}
private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
- InlongStreamInfo streamInfo) {
+ InlongStreamResponse streamInfo) {
BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
sourceRequest.setSourceName(binlogSource.getSourceName());
sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 522affc..0ca083b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -23,15 +23,15 @@ import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.List;
import java.util.stream.Collectors;
public class InlongStreamTransfer {
- public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
- InlongStreamInfo dataStreamInfo = new InlongStreamInfo();
+ public static InlongStreamResponse createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
+ InlongStreamResponse dataStreamInfo = new InlongStreamResponse();
dataStreamInfo.setInlongGroupId(groupInfo.getInlongGroupId());
final String streamId = "b_" + streamConf.getName();
dataStreamInfo.setInlongStreamId(streamId);
@@ -58,7 +58,7 @@ public class InlongStreamTransfer {
public static List<InlongStreamFieldInfo> createStreamFields(
List<StreamField> fieldList,
- InlongStreamInfo streamInfo) {
+ InlongStreamResponse streamInfo) {
List<InlongStreamFieldInfo> fieldInfos = fieldList.stream().map(streamField -> {
InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
fieldInfo.setInlongStreamId(streamInfo.getInlongStreamId());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index e7f5f2f..19c17af 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -27,6 +27,7 @@ import java.util.Locale;
*/
public enum SourceType {
+ AUTO_PUSH("AUTO_PUSH", null),
FILE("FILE", TaskTypeEnum.FILE),
SQL("SQL", TaskTypeEnum.SQL),
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java
deleted file mode 100644
index a69112a..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullPageUpdateRequest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.stream;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-
-/**
- * Inlong stream page update request,
- * <p/>including the basic info of the inlong stream and source
- */
-@Data
-@ApiModel("Inlong stream page update request")
-public class FullPageUpdateRequest {
-
- @ApiModelProperty("Inlong stream info")
- private InlongStreamInfo streamInfo;
-
- @ApiModelProperty("Basic file info")
- private SourceFileBasicInfo fileBasicInfo;
-
- @ApiModelProperty("Basic db info")
- private SourceDbBasicInfo dbBasicInfo;
-
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
index 6ddc0c7..5468f6d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamRequest.java
@@ -21,10 +21,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import java.util.List;
@@ -35,22 +32,13 @@ import java.util.List;
@ApiModel("All request info on the inlong stream page")
public class FullStreamRequest {
- @ApiModelProperty("Inlong stream information")
- private InlongStreamInfo streamInfo;
+ @ApiModelProperty("Inlong stream info")
+ private InlongStreamRequest streamInfo;
- @ApiModelProperty("Basic information of file source")
- private SourceFileBasicInfo fileBasicInfo;
+ @ApiModelProperty("Source info list")
+ private List<SourceRequest> sourceInfo;
- @ApiModelProperty("File source details")
- private List<SourceFileDetailInfo> fileDetailInfoList;
-
- @ApiModelProperty("DB source basic information")
- private SourceDbBasicInfo dbBasicInfo;
-
- @ApiModelProperty("DB source details")
- private List<SourceDbDetailInfo> dbDetailInfoList;
-
- @ApiModelProperty("Sink information")
+ @ApiModelProperty("Sink info list")
private List<SinkRequest> sinkInfo;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
index 7343ca5..8fca538 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/FullStreamResponse.java
@@ -21,10 +21,6 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import java.util.List;
@@ -37,19 +33,7 @@ import java.util.List;
public class FullStreamResponse {
@ApiModelProperty("Inlong stream information")
- private InlongStreamInfo streamInfo;
-
- @ApiModelProperty("Basic information of file source")
- private SourceFileBasicInfo fileBasicInfo;
-
- @ApiModelProperty("File source details")
- private List<SourceFileDetailInfo> fileDetailInfoList;
-
- @ApiModelProperty("DB source basic information")
- private SourceDbBasicInfo dbBasicInfo;
-
- @ApiModelProperty("DB source details")
- private List<SourceDbDetailInfo> dbDetailInfoList;
+ private InlongStreamResponse streamInfo;
@ApiModelProperty("Stream source information")
private List<SourceResponse> sourceInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
similarity index 69%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index b82cb37..b23e42d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -17,22 +17,20 @@
package org.apache.inlong.manager.common.pojo.stream;
-import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import java.util.Date;
import java.util.List;
/**
- * Inlong stream info
+ * Inlong stream request.
*/
@Data
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream info")
-public class InlongStreamInfo extends InlongStreamBaseInfo {
+@ApiModel("Inlong stream page update request")
+public class InlongStreamRequest extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Primary key")
private Integer id;
@@ -47,19 +45,16 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
private String mqResourceObj;
- @ApiModelProperty(value = "Source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
- private String dataSourceType;
-
@ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
private Integer storagePeriod;
@ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
private String dataType;
- @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
- @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
private String dataSeparator;
@ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
@@ -68,7 +63,9 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
@ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
private Integer havePredefinedFields;
- @ApiModelProperty(value = "order_preserving 0: none, 1: yes")
+ @ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes",
+ notes = "Each task under this stream sends data synchronously, "
+ + "which will affect the throughput of data collection, please choose carefully")
private Integer syncSend;
@ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -86,32 +83,7 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Names of responsible persons, separated by commas")
private String inCharges;
- @ApiModelProperty(value = "Status")
- private Integer status;
-
- @ApiModelProperty(value = "Previous status")
- private Integer previousStatus;
-
- @ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
- private Integer isDeleted = 0;
-
- private String creator;
-
- private String modifier;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date createTime;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date modifyTime;
-
- @ApiModelProperty(value = "Temporary view, string in JSON format")
- private String tempView;
-
- @ApiModelProperty(value = "Extended information list")
- private List<InlongStreamExtInfo> extList;
-
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
similarity index 86%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index b82cb37..8eb80db 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -32,7 +32,7 @@ import java.util.List;
@Data
@EqualsAndHashCode(callSuper = true)
@ApiModel("Inlong stream info")
-public class InlongStreamInfo extends InlongStreamBaseInfo {
+public class InlongStreamResponse extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Primary key")
private Integer id;
@@ -47,19 +47,16 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
private String mqResourceObj;
- @ApiModelProperty(value = "Source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
- private String dataSourceType;
-
@ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
private Integer storagePeriod;
@ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
private String dataType;
- @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
- @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+ @ApiModelProperty(value = "Data separator, stored as ASCII code")
private String dataSeparator;
@ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
@@ -68,7 +65,9 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
@ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
private Integer havePredefinedFields;
- @ApiModelProperty(value = "order_preserving 0: none, 1: yes")
+ @ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes",
+ notes = "Each task under this stream sends data synchronously, "
+ + "which will affect the throughput of data collection, please choose carefully")
private Integer syncSend;
@ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -108,9 +107,6 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Temporary view, string in JSON format")
private String tempView;
- @ApiModelProperty(value = "Extended information list")
- private List<InlongStreamExtInfo> extList;
-
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
index ef7dd15..b2c8f99 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
@@ -22,7 +22,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.exceptions.FormValidateException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import java.util.List;
import java.util.Map;
@@ -40,7 +40,7 @@ public class GroupResourceProcessForm extends BaseProcessForm {
private String streamId;
- private List<InlongStreamInfo> inlongStreamInfoList;
+ private List<InlongStreamResponse> streamList;
public InlongGroupInfo getGroupInfo() {
return groupInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
index 2571a52..a367a7f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
@@ -25,7 +25,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.inlong.manager.common.exceptions.FormValidateException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.util.Preconditions;
import java.util.List;
@@ -44,7 +44,7 @@ public class UpdateGroupProcessForm extends BaseProcessForm {
@ApiModelProperty(value = "OperateType to define the update operation", required = true)
private OperateType operateType;
- private List<InlongStreamInfo> inlongStreamInfoList;
+ private List<InlongStreamResponse> streamList;
@Override
public void validate() throws FormValidateException {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index baec6e7..bc23767 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -227,7 +227,7 @@ public class CommonOperateService {
// Get source info
String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
- InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+ InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
groupInfo, streamInfo, sourceResponse, sourceFields);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
index ac0f294..5388a7d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongStreamService.java
@@ -18,14 +18,14 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import java.util.List;
@@ -33,18 +33,19 @@ import java.util.List;
/**
* Inlong stream service layer interface
*
- * @apiNote It is associated with various DataSources, the upstream is StreamSource, and the downstream is StreamSink
+ * @apiNote It is associated with various DataSources, the upstream is StreamSource, and the downstream is
+ * StreamSink
*/
public interface InlongStreamService {
/**
- * Save inlong stream information
+ * Save inlong stream information.
*
- * @param streamInfo Basic inlong stream information
- * @param operator Edit person's name
- * @return Inlong stream id after successful save
+ * @param request Inlong stream information.
+ * @param operator The name of operator.
+ * @return Id after successful save.
*/
- Integer save(InlongStreamInfo streamInfo, String operator);
+ Integer save(InlongStreamRequest request, String operator);
/**
* Query the details of the specified inlong stream
@@ -53,7 +54,7 @@ public interface InlongStreamService {
* @param streamId Inlong stream id
* @return inlong stream details
*/
- InlongStreamInfo get(String groupId, String streamId);
+ InlongStreamResponse get(String groupId, String streamId);
/**
* Query whether the inlong stream ID exists
@@ -75,11 +76,11 @@ public interface InlongStreamService {
/**
* InlongStream info that needs to be modified
*
- * @param streamInfo inlong stream information that needs to be modified
+ * @param request inlong stream info that needs to be modified
* @param operator Edit person's name
* @return whether succeed
*/
- boolean update(InlongStreamInfo streamInfo, String operator);
+ Boolean update(InlongStreamRequest request, String operator);
/**
* Delete the specified inlong stream
@@ -89,7 +90,7 @@ public interface InlongStreamService {
* @param operator Edit person's name
* @return whether succeed
*/
- boolean delete(String groupId, String streamId, String operator);
+ Boolean delete(String groupId, String streamId, String operator);
/**
* Logically delete all inlong streams under the specified groupId
@@ -98,7 +99,7 @@ public interface InlongStreamService {
* @param operator Edit person's name
* @return whether succeed
*/
- boolean logicDeleteAll(String groupId, String operator);
+ Boolean logicDeleteAll(String groupId, String operator);
/**
* Obtain the flow of inlong stream according to groupId
@@ -145,7 +146,7 @@ public interface InlongStreamService {
* @apiNote The data source details and data sink information are modified separately,
* not in this all modification interface
*/
- boolean updateAll(FullPageUpdateRequest updateInfo, String operator);
+ boolean updateAll(InlongStreamRequest updateInfo, String operator);
/**
* According to the group id, query the number of valid inlong streams belonging to this service
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 7ce1040..70dab08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.core.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
@@ -193,9 +193,9 @@ public class InlongGroupProcessOperation {
if (OperateType.RESTART == operateType) {
List<InlongStreamEntity> inlongStreamEntityList =
streamMapper.selectByGroupId(groupInfo.getInlongGroupId());
- List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
- InlongStreamInfo::new);
- form.setInlongStreamInfoList(inlongStreamInfoList);
+ List<InlongStreamResponse> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+ InlongStreamResponse::new);
+ form.setStreamList(streamList);
}
form.setGroupInfo(groupInfo);
form.setOperateType(operateType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 8315922..68f24c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -31,16 +31,17 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -66,6 +67,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@@ -97,11 +99,11 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public Integer save(InlongStreamInfo streamInfo, String operator) {
- LOGGER.debug("begin to save inlong stream info={}", streamInfo);
- Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
- String groupId = streamInfo.getInlongGroupId();
- String streamId = streamInfo.getInlongStreamId();
+ public Integer save(InlongStreamRequest request, String operator) {
+ LOGGER.debug("begin to save inlong stream info={}", request);
+ Preconditions.checkNotNull(request, "inlong stream info is empty");
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
@@ -114,29 +116,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.error("inlong stream id [{}] has already exists", streamId);
throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
}
- if (StringUtils.isEmpty(streamInfo.getMqResourceObj())) {
- streamInfo.setMqResourceObj(streamId);
+ if (StringUtils.isEmpty(request.getMqResourceObj())) {
+ request.setMqResourceObj(streamId);
}
// Processing inlong stream
- InlongStreamEntity streamEntity = CommonBeanUtils.copyProperties(streamInfo, InlongStreamEntity::new);
- Date date = new Date();
+ InlongStreamEntity streamEntity = CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
streamEntity.setStatus(EntityStatus.STREAM_NEW.getCode());
- streamEntity.setModifier(operator);
- streamEntity.setCreateTime(date);
+ streamEntity.setCreator(operator);
+ streamEntity.setCreateTime(new Date());
streamMapper.insertSelective(streamEntity);
-
- // Processing extended information
- this.saveExt(groupId, streamId, streamInfo.getExtList(), date);
- // WorkflowProcess data source fields
- this.saveField(groupId, streamId, streamInfo.getFieldList());
+ this.saveField(groupId, streamId, request.getFieldList());
LOGGER.info("success to save inlong stream info for groupId={}", groupId);
return streamEntity.getId();
}
@Override
- public InlongStreamInfo get(String groupId, String streamId) {
+ public InlongStreamResponse get(String groupId, String streamId) {
LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
@@ -147,11 +144,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
- InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
- this.setStreamExtAndField(groupId, streamId, streamInfo);
+ InlongStreamResponse response = CommonBeanUtils.copyProperties(streamEntity, InlongStreamResponse::new);
+ List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, streamId);
+ response.setFieldList(streamFields);
LOGGER.info("success to get inlong stream for groupId={}", groupId);
- return streamInfo;
+ return response;
}
@Override
@@ -163,20 +161,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
/**
* Query and set the extended information and data source fields of the inlong stream
- *
- * @param groupId Inlong group id
- * @param streamId Inlong stream id
- * @param streamInfo Inlong stream that needs to be filled
*/
- private void setStreamExtAndField(String groupId, String streamId, InlongStreamInfo streamInfo) {
- List<InlongStreamExtEntity> extEntityList = streamExtMapper.selectByIdentifier(groupId, streamId);
- if (CollectionUtils.isNotEmpty(extEntityList)) {
- streamInfo.setExtList(CommonBeanUtils.copyListProperties(extEntityList, InlongStreamExtInfo::new));
- }
+ private List<InlongStreamFieldInfo> getStreamFields(String groupId, String streamId) {
List<InlongStreamFieldEntity> fieldEntityList = streamFieldMapper.selectByIdentifier(groupId, streamId);
- if (CollectionUtils.isNotEmpty(fieldEntityList)) {
- streamInfo.setFieldList(CommonBeanUtils.copyListProperties(fieldEntityList, InlongStreamFieldInfo::new));
+ if (CollectionUtils.isEmpty(fieldEntityList)) {
+ return Collections.emptyList();
}
+ return CommonBeanUtils.copyListProperties(fieldEntityList, InlongStreamFieldInfo::new);
}
@Override
@@ -217,34 +208,34 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean update(InlongStreamInfo streamInfo, String operator) {
- LOGGER.debug("begin to update inlong stream info={}", streamInfo);
- Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
- String groupId = streamInfo.getInlongGroupId();
+ public Boolean update(InlongStreamRequest request, String operator) {
+ LOGGER.debug("begin to update inlong stream info={}", request);
+ Preconditions.checkNotNull(request, "inlong stream request is empty");
+ String groupId = request.getInlongGroupId();
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- String streamId = streamInfo.getInlongStreamId();
+ String streamId = request.getInlongStreamId();
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
// Check if it can be modified
InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
- // Add if it doesn't exist, modify if it exists
+ // Make sure the stream was exists
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (streamEntity == null) {
- this.save(streamInfo, operator);
- } else {
- // Check whether the current inlong group status supports modification
- this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, streamInfo);
+ LOGGER.error("inlong stream not found by groupId={}, streamId={}", groupId, streamId);
+ throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
+ }
- CommonBeanUtils.copyProperties(streamInfo, streamEntity, true);
- streamEntity.setModifier(operator);
- streamEntity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
- streamMapper.updateByIdentifierSelective(streamEntity);
+ // Check whether the current inlong group status supports modification
+ this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, request);
- // Update extended information, field information
- this.updateExt(groupId, streamId, streamInfo.getExtList());
- this.updateField(groupId, streamId, streamInfo.getFieldList());
- }
+ CommonBeanUtils.copyProperties(request, streamEntity, true);
+ streamEntity.setModifier(operator);
+ streamEntity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
+ streamMapper.updateByIdentifierSelective(streamEntity);
+
+ // Update field information
+ this.updateField(groupId, streamId, request.getFieldList());
LOGGER.info("success to update inlong stream for groupId={}", groupId);
return true;
@@ -252,13 +243,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean delete(String groupId, String streamId, String operator) {
+ public Boolean delete(String groupId, String streamId, String operator) {
LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
// Check if it can be deleted
- InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
+ this.checkBizIsTempStatus(groupId);
InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, streamId);
if (entity == null) {
@@ -266,28 +257,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
- // If there is an undeleted data source, the deletion fails
- boolean dataSourceExist = hasDataSource(groupId, streamId, entity.getDataSourceType());
- if (dataSourceExist) {
- LOGGER.error("inlong stream has undeleted data sources, delete failed");
+ // If there is undeleted stream source, the deletion fails
+ Integer sourceCount = sourceService.getCount(groupId, streamId);
+ if (sourceCount > 0) {
+ LOGGER.error("inlong stream has undeleted sources, delete failed");
throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
}
- // If there is undeleted data sink information, the deletion fails
+ // If there is undeleted stream sink, the deletion fails
int sinkCount = sinkService.getCount(groupId, streamId);
if (sinkCount > 0) {
LOGGER.error("inlong stream has undeleted sinks, delete failed");
throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
}
- entity.setIsDeleted(1);
+ entity.setIsDeleted(entity.getId());
entity.setModifier(operator);
streamMapper.updateByPrimaryKey(entity);
- // To logically delete the associated extension table
- LOGGER.debug("begin to delete inlong stream ext property, groupId={}, streamId={}", groupId, streamId);
- streamExtMapper.logicDeleteAllByIdentifier(groupId, streamId);
-
// Logically delete the associated field table
LOGGER.debug("begin to delete inlong stream field, streamId={}", streamId);
streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
@@ -298,7 +285,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean logicDeleteAll(String groupId, String operator) {
+ public Boolean logicDeleteAll(String groupId, String operator) {
LOGGER.debug("begin to delete all inlong stream by groupId={}", groupId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
@@ -317,14 +304,9 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamMapper.updateByIdentifierSelective(entity);
String streamId = entity.getInlongStreamId();
- // To logically delete the associated extension table
- streamExtMapper.logicDeleteAllByIdentifier(groupId, streamId);
- // Logically delete the associated field table
+ // Logically delete the associated field, source and sink info
streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
- // Tombstone the associated data source
- sourceFileService.logicDeleteAllByIdentifier(groupId, streamId, operator);
- sourceDbService.logicDeleteAllByIdentifier(groupId, streamId, operator);
- // Logical deletion of associated data sink information
+ sourceService.logicDeleteAll(groupId, streamId, operator);
sinkService.logicDeleteAll(groupId, streamId, operator);
}
@@ -376,36 +358,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
}
Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
- InlongStreamInfo streamInfo = fullStreamRequest.getStreamInfo();
- Preconditions.checkNotNull(streamInfo, "inlong stream info is empty");
+ InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
+ Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
// Check whether it can be added: check by lower-level specific services
// this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
// 1. Save inlong stream
- this.save(streamInfo, operator);
+ this.save(streamRequest, operator);
- // 2.1 Save file data source information
- if (fullStreamRequest.getFileBasicInfo() != null) {
- sourceFileService.saveBasic(fullStreamRequest.getFileBasicInfo(), operator);
- }
- if (CollectionUtils.isNotEmpty(fullStreamRequest.getFileDetailInfoList())) {
- for (SourceFileDetailInfo detailInfo : fullStreamRequest.getFileDetailInfoList()) {
- sourceFileService.saveDetail(detailInfo, operator);
- }
- }
-
- // 2.2 Save DB data source information
- if (fullStreamRequest.getDbBasicInfo() != null) {
- sourceDbService.saveBasic(fullStreamRequest.getDbBasicInfo(), operator);
- }
- if (CollectionUtils.isNotEmpty(fullStreamRequest.getDbDetailInfoList())) {
- for (SourceDbDetailInfo detailInfo : fullStreamRequest.getDbDetailInfoList()) {
- sourceDbService.saveDetail(detailInfo, operator);
+ // 2 Save source info
+ if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
+ for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
+ sourceService.save(source, operator);
}
}
- // 3. Save data sink information
+ // 3. Save sink info
if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
sinkService.save(sinkInfo, operator);
@@ -425,7 +394,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
// Check if it can be added
- InlongStreamInfo firstStream = fullStreamRequestList.get(0).getStreamInfo();
+ InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
String groupId = firstStream.getInlongGroupId();
this.checkBizIsTempStatus(groupId);
@@ -438,17 +407,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
for (FullStreamRequest pageInfo : fullStreamRequestList) {
// 1.1 Delete the inlong stream extensions and fields corresponding to groupId and streamId
- InlongStreamInfo streamInfo = pageInfo.getStreamInfo();
+ InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
String streamId = streamInfo.getInlongStreamId();
-
- streamExtMapper.deleteAllByIdentifier(groupId, streamId);
streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
- // 2. Delete file data source, DB data source information
- sourceFileService.deleteAllByIdentifier(groupId, streamId);
- sourceDbService.deleteAllByIdentifier(groupId, streamId);
+ // 2. Delete all stream source
+ sourceService.deleteAll(groupId, streamId, operator);
- // 3. Delete data sink information
+ // 3. Delete all stream sink
sinkService.deleteAll(groupId, streamId, operator);
// 4. Save the inlong stream of this batch
@@ -477,16 +443,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<InlongStreamEntity> page = (Page<InlongStreamEntity>) streamMapper.selectByCondition(request);
- List<InlongStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamInfo::new);
+ List<InlongStreamResponse> streamInfoList = CommonBeanUtils.copyListProperties(page, InlongStreamResponse::new);
// Convert and encapsulate the paged results
List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
- for (InlongStreamInfo streamInfo : streamInfoList) {
- // 2.1 Set the extended information and field information of the inlong stream
+ for (InlongStreamResponse streamInfo : streamInfoList) {
+ // 2Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
- setStreamExtAndField(groupId, streamId, streamInfo);
+ List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
+ streamInfo.setFieldList(streamFields);
- // 2.3 Set the inlong stream to the result sub-object
FullStreamResponse pageInfo = new FullStreamResponse();
pageInfo.setStreamInfo(streamInfo);
@@ -511,28 +477,11 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean updateAll(FullPageUpdateRequest updateInfo, String operator) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to update all stream page info: {}", updateInfo);
- }
- Preconditions.checkNotNull(updateInfo, "updateInfo is empty");
- Preconditions.checkNotNull(updateInfo.getStreamInfo(), "inlong stream info is empty");
-
- // 1. Modify the inlong stream (inlong stream information cannot be empty)
- this.update(updateInfo.getStreamInfo(), operator);
-
- // 2. Modify the basic information of the file data source
- if (updateInfo.getFileBasicInfo() != null) {
- sourceFileService.updateBasic(updateInfo.getFileBasicInfo(), operator);
- }
-
- // 3. Save the basic information of the DB data source
- if (updateInfo.getDbBasicInfo() != null) {
- sourceDbService.updateBasic(updateInfo.getDbBasicInfo(), operator);
- }
-
- // TODO Update stream source info
+ public boolean updateAll(InlongStreamRequest request, String operator) {
+ LOGGER.info("begin to update all stream page info: " + request);
+ Preconditions.checkNotNull(request, "request is empty");
+ this.update(request, operator);
LOGGER.info("success to update all stream page info");
return true;
}
@@ -720,16 +669,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
*
* @param groupStatus Inlong group status
* @param streamEntity Original inlong stream entity
- * @param streamInfo New inlong stream information
+ * @param request New inlong stream information
*/
- private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamInfo streamInfo) {
- if (streamEntity == null || streamInfo == null) {
+ private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
+ if (streamEntity == null || request == null) {
return;
}
// Fields that are not allowed to be modified when the inlong group [configuration is successful]
if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupStatus)) {
- checkUpdatedFields(streamEntity, streamInfo);
+ checkUpdatedFields(streamEntity, request);
}
// Inlong group [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
@@ -739,14 +688,13 @@ public class InlongStreamServiceImpl implements InlongStreamService {
EntityStatus.GROUP_APPROVE_REJECTED.getCode(),
EntityStatus.GROUP_CONFIG_FAILED.getCode());
if (statusList.contains(groupStatus)) {
- String groupId = streamInfo.getInlongGroupId();
- String streamId = streamInfo.getInlongStreamId();
- // Whether there is an undeleted data source
- boolean dataSourceExist = hasDataSource(groupId, streamId, streamInfo.getDataSourceType());
- // Whether there is undeleted stream sink
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ // Whether there is undeleted stream source and sink
+ int sourceCount = sourceService.getCount(groupId, streamId);
int sinkCount = sinkService.getCount(groupId, streamId);
- if (dataSourceExist || sinkCount > 0) {
- checkUpdatedFields(streamEntity, streamInfo);
+ if (sourceCount > 0 || sinkCount > 0) {
+ checkUpdatedFields(streamEntity, request);
}
}
}
@@ -754,24 +702,18 @@ public class InlongStreamServiceImpl implements InlongStreamService {
/**
* Check that groupId, streamId, and dataSourceType are not allowed to be modified
*/
- private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamInfo streamInfo) {
- String newGroupId = streamInfo.getInlongGroupId();
+ private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamRequest request) {
+ String newGroupId = request.getInlongGroupId();
if (newGroupId != null && !newGroupId.equals(streamEntity.getInlongGroupId())) {
LOGGER.error("current status was not allowed to update inlong group id");
throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
}
- String newDsid = streamInfo.getInlongStreamId();
- if (newDsid != null && !newDsid.equals(streamEntity.getInlongStreamId())) {
+ String newStreamId = request.getInlongStreamId();
+ if (newStreamId != null && !newStreamId.equals(streamEntity.getInlongStreamId())) {
LOGGER.error("current status was not allowed to update inlong stream id");
throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
}
-
- String newSourceType = streamInfo.getDataSourceType();
- if (newSourceType != null && !newSourceType.equals(streamEntity.getDataSourceType())) {
- LOGGER.error("current status was not allowed to update data source type");
- throw new BusinessException(ErrorCodeEnum.STREAM_SOURCE_UPDATE_NOT_ALLOWED);
- }
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 16ccb9d..0d5f45b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -125,7 +125,7 @@ public class PushSortConfigListener implements SortOperateListener {
List<StreamSinkFieldEntity> fieldList) {
DeserializationInfo deserializationInfo = null;
String groupId = groupInfo.getInlongGroupId();
- InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+ InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
boolean isDbType = Constant.DATA_SOURCE_DB.equals(streamInfo.getDataType());
if (!isDbType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index cea3110..a2211b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
@@ -48,7 +48,7 @@ public class SerializationUtils {
* Create deserialization info
*/
public static DeserializationInfo createDeserialInfo(SourceResponse sourceResponse,
- InlongStreamInfo streamInfo) {
+ InlongStreamResponse streamInfo) {
SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
switch (sourceType) {
case BINLOG:
@@ -87,7 +87,7 @@ public class SerializationUtils {
/**
* Get deserialization info for Kafka
*/
- private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamInfo stream) {
+ private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamResponse stream) {
String serializationType = source.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
@@ -135,7 +135,8 @@ public class SerializationUtils {
/**
* Get deserialization info for File
*/
- private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
+ private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse,
+ InlongStreamResponse streamInfo) {
String serializationType = sourceResponse.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 13a74c8..66838ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -61,7 +61,7 @@ public class SourceInfoUtils {
* Create source info for DataFlowInfo.
*/
public static SourceInfo createSourceInfo(PulsarClusterInfo pulsarCluster, String masterAddress,
- ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamResponse streamInfo,
SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
String mqType = groupInfo.getMiddlewareType();
@@ -84,7 +84,7 @@ public class SourceInfoUtils {
* Create source info for Pulsar
*/
private static SourceInfo createPulsarSourceInfo(PulsarClusterInfo pulsarCluster, ClusterBean clusterBean,
- InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ InlongGroupInfo groupInfo, InlongStreamResponse streamInfo,
DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
String topicName = streamInfo.getMqResourceObj();
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
index b693a65..37b5e30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -69,9 +69,9 @@ public class StartCreateGroupProcessListener implements ProcessEventListener {
processForm.setGroupInfo(groupService.get(groupId));
String username = context.getApplicant();
List<InlongStreamEntity> inlongStreamEntityList = streamMapper.selectByGroupId(groupId);
- List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
- InlongStreamInfo::new);
- processForm.setInlongStreamInfoList(inlongStreamInfoList);
+ List<InlongStreamResponse> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+ InlongStreamResponse::new);
+ processForm.setStreamList(streamList);
workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
index 5eb136c..88b98a8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
@@ -17,7 +17,8 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.junit.Assert;
import org.junit.Test;
@@ -44,11 +45,11 @@ public class InlongStreamServiceTest {
* Test save inlong stream
*/
public Integer saveInlongStream(String groupId, String streamId, String operator) {
- InlongStreamInfo streamInfo;
+ ;
try {
- streamInfo = streamService.get(groupId, streamId);
- if (streamInfo != null) {
- return streamInfo.getId();
+ InlongStreamResponse response = streamService.get(groupId, streamId);
+ if (response != null) {
+ return response.getId();
}
} catch (Exception e) {
// ignore
@@ -56,12 +57,12 @@ public class InlongStreamServiceTest {
groupServiceTest.saveGroup(globalGroupName, operator);
- streamInfo = new InlongStreamInfo();
- streamInfo.setInlongGroupId(groupId);
- streamInfo.setInlongStreamId(streamId);
- streamInfo.setDataEncoding("UTF-8");
+ InlongStreamRequest request = new InlongStreamRequest();
+ request.setInlongGroupId(groupId);
+ request.setInlongStreamId(streamId);
+ request.setDataEncoding("UTF-8");
- return streamService.save(streamInfo, operator);
+ return streamService.save(request, operator);
}
@Test
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 539f256..367baed 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
@@ -51,10 +51,10 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
private StreamSourceService streamSourceService;
public Integer createBinlogSource(InlongGroupInfo groupInfo) {
- final InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+ final InlongStreamResponse stream = createStreamInfo(groupInfo);
BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
- sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
- sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setInlongGroupId(stream.getInlongGroupId());
+ sourceRequest.setInlongStreamId(stream.getInlongStreamId());
sourceRequest.setSourceName("binlog-collect");
return streamSourceService.save(sourceRequest, OPERATOR);
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
index cdf2ffd..653a56b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/sort/DisableZkForSortTest.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -63,7 +63,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
@Autowired
protected StreamSourceService streamSourceService;
- public HiveSinkRequest createHiveSink(InlongStreamInfo streamInfo) {
+ public HiveSinkRequest createHiveSink(InlongStreamResponse streamInfo) {
HiveSinkRequest hiveSinkRequest = new HiveSinkRequest();
hiveSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
hiveSinkRequest.setSinkType("HIVE");
@@ -95,7 +95,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
return hiveSinkRequest;
}
- public KafkaSourceRequest createKafkaSource(InlongStreamInfo streamInfo) {
+ public KafkaSourceRequest createKafkaSource(InlongStreamResponse streamInfo) {
KafkaSourceRequest kafkaSourceRequest = new KafkaSourceRequest();
kafkaSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
kafkaSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
@@ -112,7 +112,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
groupInfo.setZookeeperEnabled(0);
groupService.update(groupInfo.genRequest(), OPERATOR);
- InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+ InlongStreamResponse streamInfo = createStreamInfo(groupInfo);
createHiveSink(streamInfo);
createKafkaSource(streamInfo);
mockTaskListenerFactory();
@@ -139,7 +139,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
groupInfo.setZookeeperEnabled(0);
groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
groupService.update(groupInfo.genRequest(), OPERATOR);
- InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+ InlongStreamResponse streamInfo = createStreamInfo(groupInfo);
createHiveSink(streamInfo);
createKafkaSource(streamInfo);
UpdateGroupProcessForm form = new UpdateGroupProcessForm();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 1f25978..9cbb07a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -25,7 +25,8 @@ import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskExecuteLogQuery;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -148,7 +149,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
/**
* Create inlong stream
*/
- public InlongStreamInfo createStreamInfo(InlongGroupInfo inlongGroupInfo) {
+ public InlongStreamResponse createStreamInfo(InlongGroupInfo groupInfo) {
// delete first
try {
streamService.delete(GROUP_ID, OPERATOR, OPERATOR);
@@ -156,22 +157,24 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
// ignore
}
- InlongStreamInfo streamInfo = new InlongStreamInfo();
- streamInfo.setInlongGroupId(inlongGroupInfo.getInlongGroupId());
- streamInfo.setInlongStreamId(STREAM_ID);
- streamInfo.setMqResourceObj(STREAM_ID);
- streamInfo.setDataSeparator("124");
- streamInfo.setDataEncoding(DATA_ENCODING);
- streamInfo.setInCharges(OPERATOR);
- streamInfo.setCreator(OPERATOR);
- streamInfo.setFieldList(createStreamFields(inlongGroupInfo.getInlongGroupId(), STREAM_ID));
- streamService.save(streamInfo, OPERATOR);
- return streamInfo;
+ InlongStreamRequest request = new InlongStreamRequest();
+ request.setInlongGroupId(groupInfo.getInlongGroupId());
+ request.setInlongStreamId(STREAM_ID);
+ request.setMqResourceObj(STREAM_ID);
+ request.setDataSeparator("124");
+ request.setDataEncoding(DATA_ENCODING);
+ request.setInCharges(OPERATOR);
+ request.setFieldList(createStreamFields(groupInfo.getInlongGroupId(), STREAM_ID));
+ streamService.save(request, OPERATOR);
+
+ return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
}
- public List<InlongStreamFieldInfo> createStreamFields(String inlongGroupId, String inlongStreamId) {
+ public List<InlongStreamFieldInfo> createStreamFields(String groupId, String streamId) {
final List<InlongStreamFieldInfo> streamFieldInfos = new ArrayList<>();
InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
+ fieldInfo.setInlongGroupId(groupId);
+ fieldInfo.setInlongStreamId(streamId);
fieldInfo.setFieldName("id");
fieldInfo.setFieldType("int");
fieldInfo.setFieldComment("idx");
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 143a441..1454268 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -24,12 +24,12 @@ import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -58,8 +58,8 @@ public class InlongStreamController {
@RequestMapping(value = "/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save inlong stream info")
- public Response<Integer> save(@RequestBody InlongStreamInfo streamInfo) {
- int result = streamService.save(streamInfo, LoginUserUtils.getLoginUserDetail().getUserName());
+ public Response<Integer> save(@RequestBody InlongStreamRequest request) {
+ int result = streamService.save(request, LoginUserUtils.getLoginUserDetail().getUserName());
return Response.success(result);
}
@@ -84,7 +84,7 @@ public class InlongStreamController {
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
})
- public Response<InlongStreamInfo> get(@RequestParam String groupId, @RequestParam String streamId) {
+ public Response<InlongStreamResponse> get(@RequestParam String groupId, @RequestParam String streamId) {
return Response.success(streamService.get(groupId, streamId));
}
@@ -105,16 +105,16 @@ public class InlongStreamController {
@RequestMapping(value = "/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update inlong stream info")
- public Response<Boolean> update(@RequestBody InlongStreamInfo streamInfo) {
+ public Response<Boolean> update(@RequestBody InlongStreamRequest request) {
String username = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(streamService.update(streamInfo, username));
+ return Response.success(streamService.update(request, username));
}
@RequestMapping(value = "/updateAll", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update inlong stream page info")
- public Response<Boolean> updateAll(@RequestBody FullPageUpdateRequest updateInfo) {
- boolean result = streamService.updateAll(updateInfo, LoginUserUtils.getLoginUserDetail().getUserName());
+ public Response<Boolean> updateAll(@RequestBody InlongStreamRequest request) {
+ boolean result = streamService.updateAll(request, LoginUserUtils.getLoginUserDetail().getUserName());
return Response.success(result);
}
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index aaf3e7d..ac33d7e 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -116,6 +116,10 @@
<artifactId>android-json</artifactId>
<groupId>com.vaadin.external.google</groupId>
</exclusion>
+ <exclusion>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -448,6 +452,7 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>${spring-boot.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>