You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/13 08:02:46 UTC

[incubator-inlong] branch master updated: [INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 05aa6bd65 [INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)
05aa6bd65 is described below

commit 05aa6bd6511503772c79b1d2173059f9415c777e
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri May 13 16:02:40 2022 +0800

    [INLONG-4188][Manager] Check whether the stream exists in the manager client (#4190)
---
 .../api/impl/DefaultInlongStreamBuilder.java       |  4 +--
 .../client/api/inner/InnerInlongManagerClient.java | 30 +++++++++++++++++-----
 .../service/core/impl/InlongStreamServiceImpl.java |  1 +
 .../web/controller/InlongStreamController.java     | 10 ++++++++
 4 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c744ac407..f3bd61cae 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -146,8 +146,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
         StreamPipeline streamPipeline = inlongStream.createPipeline();
         dataStreamInfo.setTempView(GsonUtil.toJson(streamPipeline));
-        Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
-        if (existMsg.getKey()) {
+        Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
+        if (isExist) {
             Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
             if (!updateMsg.getKey()) {
                 throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index daffd1e4b..749feb468 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -306,12 +306,30 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public Pair<Boolean, InlongStreamInfo> isStreamExists(InlongStreamInfo streamInfo) {
-        InlongStreamInfo currentStreamInfo = getStreamInfo(streamInfo);
-        if (currentStreamInfo != null) {
-            return Pair.of(true, currentStreamInfo);
-        } else {
-            return Pair.of(false, null);
+    public Boolean isStreamExists(InlongStreamInfo streamInfo) {
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        AssertUtil.notEmpty(groupId, "InlongGroupId should not be empty");
+        AssertUtil.notEmpty(streamId, "InlongStreamId should not be empty");
+        String path = HTTP_PATH + "/stream/exist/" + groupId + "/" + streamId;
+        final String url = formatUrl(path);
+        Request request = new Request.Builder().get()
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            okhttp3.Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            assertHttpSuccess(response, body, path);
+            Response responseBody = InlongParser.parseResponse(body);
+            if (responseBody.getErrMsg() != null) {
+                throw new RuntimeException(responseBody.getErrMsg());
+            }
+            return Boolean.parseBoolean(responseBody.getData().toString());
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong stream check exists failed: %s", e.getMessage()), e);
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index b3eaa952d..8e1d935c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -164,6 +164,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     public Boolean exist(String groupId, String streamId) {
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         return streamEntity != null;
     }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 2f5f7a08a..fd36fe350 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -79,6 +79,16 @@ public class InlongStreamController {
         return Response.success(result);
     }
 
+    @RequestMapping(value = "/exist/{groupId}/{streamId}", method = RequestMethod.GET)
+    @ApiOperation(value = "Is exists of the inlong stream")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
+    })
+    public Response<Boolean> exist(@PathVariable String groupId, @PathVariable String streamId) {
+        return Response.success(streamService.exist(groupId, streamId));
+    }
+
     @RequestMapping(value = "/get", method = RequestMethod.GET)
     @ApiOperation(value = "Get inlong stream info")
     @ApiImplicitParams({