You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/13 08:02:46 UTC
[incubator-inlong] branch master updated: [INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 05aa6bd65 [INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)
05aa6bd65 is described below
commit 05aa6bd6511503772c79b1d2173059f9415c777e
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri May 13 16:02:40 2022 +0800
[INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)
---
.../api/impl/DefaultInlongStreamBuilder.java | 4 +--
.../client/api/inner/InnerInlongManagerClient.java | 30 +++++++++++++++++-----
.../service/core/impl/InlongStreamServiceImpl.java | 1 +
.../web/controller/InlongStreamController.java | 10 ++++++++
4 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c744ac407..f3bd61cae 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -146,8 +146,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
dataStreamInfo.setTempView(GsonUtil.toJson(streamPipeline));
- Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
- if (existMsg.getKey()) {
+ Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
+ if (isExist) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index daffd1e4b..749feb468 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -306,12 +306,30 @@ public class InnerInlongManagerClient {
}
}
- public Pair<Boolean, InlongStreamInfo> isStreamExists(InlongStreamInfo streamInfo) {
- InlongStreamInfo currentStreamInfo = getStreamInfo(streamInfo);
- if (currentStreamInfo != null) {
- return Pair.of(true, currentStreamInfo);
- } else {
- return Pair.of(false, null);
+ public Boolean isStreamExists(InlongStreamInfo streamInfo) {
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ AssertUtil.notEmpty(groupId, "InlongGroupId should not be empty");
+ AssertUtil.notEmpty(streamId, "InlongStreamId should not be empty");
+ String path = HTTP_PATH + "/stream/exist/" + groupId + "/" + streamId;
+ final String url = formatUrl(path);
+ Request request = new Request.Builder().get()
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ if (responseBody.getErrMsg() != null) {
+ throw new RuntimeException(responseBody.getErrMsg());
+ }
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong stream check exists failed: %s", e.getMessage()), e);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index b3eaa952d..8e1d935c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -164,6 +164,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public Boolean exist(String groupId, String streamId) {
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
return streamEntity != null;
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 2f5f7a08a..fd36fe350 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -79,6 +79,16 @@ public class InlongStreamController {
return Response.success(result);
}
+ @RequestMapping(value = "/exist/{groupId}/{streamId}", method = RequestMethod.GET)
+ @ApiOperation(value = "Is exists of the inlong stream")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
+ })
+ public Response<Boolean> exist(@PathVariable String groupId, @PathVariable String streamId) {
+ return Response.success(streamService.exist(groupId, streamId));
+ }
+
@RequestMapping(value = "/get", method = RequestMethod.GET)
@ApiOperation(value = "Get inlong stream info")
@ApiImplicitParams({