You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/02 12:28:13 UTC
[inlong] branch master updated: [INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a735368fc [INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
a735368fc is described below
commit a735368fc2739c43dab332443cc1e3cb17021612
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Aug 2 20:28:08 2022 +0800
[INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
---
.../inlong/manager/client/api/InlongStream.java | 2 +-
.../api/inner/client/InlongStreamClient.java | 102 ++++++++++++++++++++-
.../client/api/service/InlongStreamApi.java | 19 ++++
3 files changed, 121 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 62fe52db5..d0e0897de 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -62,7 +62,7 @@ public interface InlongStream {
StreamSink getSinkInfoByName(String sinkName);
/**
- * Return data transform node defined in stream(split,string replace etc)
+ * Return data transform node defined in stream(split, string replace etc.)
* key is transform name which must be unique within one stream scope.
*/
Map<String, StreamTransform> getTransforms();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index f54cbd86f..ad24f03bc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -22,10 +22,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.InlongStreamApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
-import org.apache.inlong.manager.common.util.Preconditions;
import java.util.List;
@@ -49,6 +50,12 @@ public class InlongStreamClient {
return response.getData();
}
+ /**
+ * Query whether the inlong stream ID exists
+ *
+ * @param streamInfo inlong stream info
+ * @return true: exists, false: does not exist
+ */
public Boolean isStreamExists(InlongStreamInfo streamInfo) {
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
@@ -60,6 +67,12 @@ public class InlongStreamClient {
return response.getData();
}
+ /**
+ * InlongStream info that needs to be modified
+ *
+ * @param streamInfo inlong stream info that needs to be modified
+ * @return whether succeed
+ */
public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) {
Response<Boolean> resp = ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
@@ -86,6 +99,19 @@ public class InlongStreamClient {
}
}
+ /**
+ * Paging query inlong stream brief info list
+ *
+ * @param request query request
+ * @return inlong stream brief list
+ */
+ public PageInfo<InlongStreamBriefInfo> listByCondition(InlongStreamPageRequest request) {
+ Response<PageInfo<InlongStreamBriefInfo>> response = ClientUtils.executeHttpCall(
+ inlongStreamApi.listByCondition(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Get inlong stream info.
*/
@@ -99,4 +125,78 @@ public class InlongStreamClient {
return response.getData().getList();
}
+ /**
+ * Create stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean startProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.startProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Suspend stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean suspendProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.suspendProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Restart stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean restartProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.restartProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Delete stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean deleteProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.deleteProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Delete the specified inlong stream
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean delete(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.delete(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 5e5f36bd2..83ab31759 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.client.api.service;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import retrofit2.Call;
import retrofit2.http.Body;
+import retrofit2.http.DELETE;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;
@@ -43,7 +45,24 @@ public interface InlongStreamApi {
Call<Response<InlongStreamInfo>> getStream(@Query("groupId") String groupId,
@Query("streamId") String streamId);
+ @POST("/stream/list")
+ Call<Response<PageInfo<InlongStreamBriefInfo>>> listByCondition(@Body InlongStreamPageRequest request);
+
@POST("stream/listAll")
Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body InlongStreamPageRequest request);
+ @POST("/stream/startProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> startProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+ @POST("/stream/suspendProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> suspendProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+ @POST("/stream/restartProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> restartProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+ @POST("/stream/deleteProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> deleteProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
+
+ @DELETE("/stream/delete")
+ Call<Response<Boolean>> delete(@Path("groupId") String groupId, @Path("streamId") String streamId);
}