You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/02 12:28:13 UTC

[inlong] branch master updated: [INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a735368fc [INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
a735368fc is described below

commit a735368fc2739c43dab332443cc1e3cb17021612
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Aug 2 20:28:08 2022 +0800

    [INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
---
 .../inlong/manager/client/api/InlongStream.java    |   2 +-
 .../api/inner/client/InlongStreamClient.java       | 102 ++++++++++++++++++++-
 .../client/api/service/InlongStreamApi.java        |  19 ++++
 3 files changed, 121 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 62fe52db5..d0e0897de 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -62,7 +62,7 @@ public interface InlongStream {
     StreamSink getSinkInfoByName(String sinkName);
 
     /**
-     * Return data transform node defined in stream(split,string replace etc)
+     * Return data transform node defined in stream(split, string replace etc.)
      * key is transform name which must be unique within one stream scope.
      */
     Map<String, StreamTransform> getTransforms();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index f54cbd86f..ad24f03bc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -22,10 +22,11 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.InlongStreamApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
-import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.List;
 
@@ -49,6 +50,12 @@ public class InlongStreamClient {
         return response.getData();
     }
 
+    /**
+     * Query whether the inlong stream ID exists
+     *
+     * @param streamInfo inlong stream info
+     * @return true: exists, false: does not exist
+     */
     public Boolean isStreamExists(InlongStreamInfo streamInfo) {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
@@ -60,6 +67,12 @@ public class InlongStreamClient {
         return response.getData();
     }
 
+    /**
+     * InlongStream info that needs to be modified
+     *
+     * @param streamInfo inlong stream info that needs to be modified
+     * @return whether succeed
+     */
     public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) {
         Response<Boolean> resp = ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
 
@@ -86,6 +99,19 @@ public class InlongStreamClient {
         }
     }
 
+    /**
+     * Paging query inlong stream brief info list
+     *
+     * @param request query request
+     * @return inlong stream brief list
+     */
+    public PageInfo<InlongStreamBriefInfo> listByCondition(InlongStreamPageRequest request) {
+        Response<PageInfo<InlongStreamBriefInfo>> response = ClientUtils.executeHttpCall(
+                inlongStreamApi.listByCondition(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Get inlong stream info.
      */
@@ -99,4 +125,78 @@ public class InlongStreamClient {
         return response.getData().getList();
     }
 
+    /**
+     * Create stream in synchronous/asynchronous way.
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return whether succeed
+     */
+    public boolean startProcess(String groupId, String streamId) {
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.startProcess(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Suspend stream in synchronous/asynchronous way.
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return whether succeed
+     */
+    public boolean suspendProcess(String groupId, String streamId) {
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.suspendProcess(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Restart stream in synchronous/asynchronous way.
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return whether succeed
+     */
+    public boolean restartProcess(String groupId, String streamId) {
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.restartProcess(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Delete stream in synchronous/asynchronous way.
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return whether succeed
+     */
+    public boolean deleteProcess(String groupId, String streamId) {
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.deleteProcess(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Delete the specified inlong stream
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return whether succeed
+     */
+    public boolean delete(String groupId, String streamId) {
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.delete(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 5e5f36bd2..83ab31759 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.client.api.service;
 
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import retrofit2.Call;
 import retrofit2.http.Body;
+import retrofit2.http.DELETE;
 import retrofit2.http.GET;
 import retrofit2.http.POST;
 import retrofit2.http.Path;
@@ -43,7 +45,24 @@ public interface InlongStreamApi {
     Call<Response<InlongStreamInfo>> getStream(@Query("groupId") String groupId,
             @Query("streamId") String streamId);
 
+    @POST("/stream/list")
+    Call<Response<PageInfo<InlongStreamBriefInfo>>> listByCondition(@Body InlongStreamPageRequest request);
+
     @POST("stream/listAll")
     Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body InlongStreamPageRequest request);
 
+    @POST("/stream/startProcess/{groupId}/{streamId}")
+    Call<Response<Boolean>> startProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+    @POST("/stream/suspendProcess/{groupId}/{streamId}")
+    Call<Response<Boolean>> suspendProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+    @POST("/stream/restartProcess/{groupId}/{streamId}")
+    Call<Response<Boolean>> restartProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+    @POST("/stream/deleteProcess/{groupId}/{streamId}")
+    Call<Response<Boolean>> deleteProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+    @DELETE("/stream/delete")
+    Call<Response<Boolean>> delete(@Path("groupId") String groupId, @Path("streamId") String streamId);
 }