You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/15 12:07:43 UTC

[incubator-inlong] branch master updated: [INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete (#3150)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 11804b5  [INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete  (#3150)
11804b5 is described below

commit 11804b513b4b9e9fa5e49b2804a0aa62f753f48e
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue Mar 15 20:07:01 2022 +0800

    [INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete  (#3150)
---
 .../inlong/manager/client/api/InlongGroup.java     | 23 +++++++-
 .../manager/client/api/impl/BlankInlongGroup.java  | 15 +++++
 .../manager/client/api/impl/InlongGroupImpl.java   | 21 ++++++-
 .../client/api/inner/InnerInlongManagerClient.java | 27 ++++++++-
 .../core/impl/InlongGroupProcessOperation.java     | 66 +++++++++++++++++++++-
 .../web/controller/InlongGroupController.java      | 43 +++++++++++---
 6 files changed, 176 insertions(+), 19 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 652a2be..94f94c7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -32,7 +32,6 @@ public interface InlongGroup {
      * Create snapshot for Inlong group
      *
      * @return
-     *
      * @throws Exception
      */
     InlongGroupContext context() throws Exception;
@@ -50,7 +49,6 @@ public interface InlongGroup {
      * Update Inlong group on updated conf
      *
      * @return
-     *
      * @throws Exception
      */
     void update(InlongGroupConf conf) throws Exception;
@@ -71,6 +69,13 @@ public interface InlongGroup {
     InlongGroupContext suspend() throws Exception;
 
     /**
+     * Suspend the stream group and return group info.
+     *
+     * @return group info
+     */
+    InlongGroupContext suspend(boolean async) throws Exception;
+
+    /**
      * Restart the stream group and return group info.
      *
      * @return group info
@@ -78,6 +83,13 @@ public interface InlongGroup {
     InlongGroupContext restart() throws Exception;
 
     /**
+     * Restart the stream group and return group info.
+     *
+     * @return group info
+     */
+    InlongGroupContext restart(boolean async) throws Exception;
+
+    /**
      * delete the stream group and return group info
      *
      * @return group info
@@ -85,6 +97,13 @@ public interface InlongGroup {
     InlongGroupContext delete() throws Exception;
 
     /**
+     * delete the stream group and return group info
+     *
+     * @return group info
+     */
+    InlongGroupContext delete(boolean async) throws Exception;
+
+    /**
      * List all inlong streams in certain group
      *
      * @return inlong stream contained in this group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index 37c62cb..d94521f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -59,16 +59,31 @@ public class BlankInlongGroup implements InlongGroup {
     }
 
     @Override
+    public InlongGroupContext suspend(boolean async) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
     public InlongGroupContext restart() throws Exception {
         throw new UnsupportedOperationException("Inlong group is not exists");
     }
 
     @Override
+    public InlongGroupContext restart(boolean async) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
     public InlongGroupContext delete() throws Exception {
         throw new UnsupportedOperationException("Inlong group is not exists");
     }
 
     @Override
+    public InlongGroupContext delete(boolean async) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
     public List<InlongStream> listStreams() throws Exception {
         throw new UnsupportedOperationException("Inlong group is not exists");
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8f96a07..17fc05c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -150,31 +150,46 @@ public class InlongGroupImpl implements InlongGroup {
 
     @Override
     public InlongGroupContext suspend() throws Exception {
+        return suspend(false);
+    }
+
+    @Override
+    public InlongGroupContext suspend(boolean async) throws Exception {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
         Pair<String, String> idAndErr = managerClient.updateGroup(groupInfo.genRequest());
         final String errMsg = idAndErr.getValue();
         final String groupId = idAndErr.getKey();
         AssertUtil.isNull(errMsg, errMsg);
-        managerClient.operateInlongGroup(groupId, InlongGroupState.STOPPED);
+        managerClient.operateInlongGroup(groupId, InlongGroupState.STOPPED, async);
         return generateSnapshot();
     }
 
     @Override
     public InlongGroupContext restart() throws Exception {
+        return restart(false);
+    }
+
+    @Override
+    public InlongGroupContext restart(boolean async) throws Exception {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
         Pair<String, String> idAndErr = managerClient.updateGroup(groupInfo.genRequest());
         final String errMsg = idAndErr.getValue();
         final String groupId = idAndErr.getKey();
         AssertUtil.isNull(errMsg, errMsg);
-        managerClient.operateInlongGroup(groupId, InlongGroupState.STARTED);
+        managerClient.operateInlongGroup(groupId, InlongGroupState.STARTED, async);
         return generateSnapshot();
     }
 
     @Override
     public InlongGroupContext delete() throws Exception {
+        return delete(false);
+    }
+
+    @Override
+    public InlongGroupContext delete(boolean async) throws Exception {
         InlongGroupResponse groupResponse = managerClient.getGroupInfo(
                 groupContext.getGroupId());
-        boolean isDeleted = managerClient.deleteInlongGroup(groupResponse.getInlongGroupId());
+        boolean isDeleted = managerClient.deleteInlongGroup(groupResponse.getInlongGroupId(), async);
         if (isDeleted) {
             groupResponse.setStatus(GroupState.DELETED.getCode());
         }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 8c8c4d7..8db3ca1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -567,11 +567,23 @@ public class InnerInlongManagerClient {
     }
 
     public boolean operateInlongGroup(String groupId, InlongGroupState status) {
+        return operateInlongGroup(groupId, status, false);
+    }
+
+    public boolean operateInlongGroup(String groupId, InlongGroupState status, boolean async) {
         String path = HTTP_PATH;
         if (status == InlongGroupState.STOPPED) {
-            path += "/group/suspendProcess/";
+            if (async) {
+                path += "/group/suspendProcessAsync/";
+            } else {
+                path += "/group/suspendProcess/";
+            }
         } else if (status == InlongGroupState.STARTED) {
-            path += "/group/restartProcess/";
+            if (async) {
+                path += "/group/restartProcessAsync/";
+            } else {
+                path += "/group/restartProcess/";
+            }
         } else {
             throw new IllegalArgumentException(String.format("Unsupported state: %s", status));
         }
@@ -599,7 +611,16 @@ public class InnerInlongManagerClient {
     }
 
     public boolean deleteInlongGroup(String groupId) {
-        final String path = HTTP_PATH + "/group/delete/" + groupId;
+        return deleteInlongGroup(groupId, false);
+    }
+
+    public boolean deleteInlongGroup(String groupId, boolean async) {
+        String path = HTTP_PATH;
+        if (async) {
+            path += "/group/deleteAsync/" + groupId;
+        } else {
+            path += "/group/delete/" + groupId;
+        }
         final String url = formatUrl(path);
         RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
         Request request = new Request.Builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 6e9a41e..7ce1040 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -38,6 +39,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Operation related to inlong group process
@@ -46,13 +52,22 @@ import java.util.List;
 public class InlongGroupProcessOperation {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessOperation.class);
+
+    private final ExecutorService executorService = new ThreadPoolExecutor(
+            20,
+            40,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(),
+            new CallerRunsPolicy());
+
     @Autowired
     private InlongGroupService groupService;
     @Autowired
     private WorkflowService workflowService;
     @Autowired
     private InlongStreamService streamService;
-
     @Autowired
     private InlongStreamEntityMapper streamMapper;
 
@@ -72,6 +87,21 @@ public class InlongGroupProcessOperation {
     }
 
     /**
+     * Suspend resource application group in an asynchronous way,
+     * stop source and sort task related to application group asynchronously,
+     * persist the application status if necessary.
+     *
+     * @return groupId
+     */
+    public String suspendProcessAsync(String groupId, String operator) {
+        LOGGER.info("begin to suspend process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        groupService.updateStatus(groupId, GroupState.SUSPENDING.getCode(), operator);
+        UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.SUSPEND);
+        executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+        return groupId;
+    }
+
+    /**
      * Suspend resource application group which is started up successfully,
      * stop source and sort task related to application group asynchronously,
      * persist the application status if necessary.
@@ -86,6 +116,20 @@ public class InlongGroupProcessOperation {
     }
 
     /**
+     * Restart resource application group in an asynchronous way,
+     * starting from the last persist snapshot.
+     *
+     * @return Workflow result
+     */
+    public String restartProcessAsync(String groupId, String operator) {
+        LOGGER.info("begin to restart process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        groupService.updateStatus(groupId, GroupState.RESTARTING.getCode(), operator);
+        UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.RESTART);
+        executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+        return groupId;
+    }
+
+    /**
      * Restart resource application group which is suspended successfully,
      * starting from the last persist snapshot.
      *
@@ -99,7 +143,25 @@ public class InlongGroupProcessOperation {
     }
 
     /**
-     * Delete resource application group logically and delete related resource
+     * Delete resource application group logically and delete related resource in an
+     */
+    public String deleteProcessAsync(String groupId, String operator) {
+        LOGGER.info("begin to delete process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        executorService.execute(() -> {
+            try {
+                UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.DELETE);
+                workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
+            } catch (Exception ex) {
+                LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+                throw ex;
+            }
+            groupService.delete(groupId, operator);
+        });
+        return groupId;
+    }
+
+    /**
+     * Delete resource application group logically and delete related resource in an asynchronous way
      */
     public boolean deleteProcess(String groupId, String operator) {
         LOGGER.info("begin to delete process, groupId = {}, operator = {}", groupId, operator);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 8036553..ff3176d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -84,15 +84,6 @@ public class InlongGroupController {
         return Response.success(groupService.update(groupInfo, operator));
     }
 
-    @RequestMapping(value = "/delete/{groupId}", method = RequestMethod.DELETE)
-    @ApiOperation(value = "Delete inlong group information")
-    @OperationLog(operation = OperationType.DELETE)
-    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
-    public Response<Boolean> delete(@PathVariable String groupId) {
-        String operator = LoginUserUtils.getLoginUserDetail().getUserName();
-        return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
-    }
-
     @RequestMapping(value = "/exist/{groupId}", method = RequestMethod.GET)
     @ApiOperation(value = "Query whether the inlong group id exists")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
@@ -131,6 +122,40 @@ public class InlongGroupController {
         return Response.success(groupProcessOperation.restartProcess(groupId, operator));
     }
 
+    @RequestMapping(value = "/delete/{groupId}", method = RequestMethod.DELETE)
+    @ApiOperation(value = "Delete inlong group information")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+    public Response<Boolean> delete(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+        return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
+    }
+
+    @RequestMapping(value = "suspendProcessAsync/{groupId}", method = RequestMethod.POST)
+    @ApiOperation(value = "Suspend process")
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
+    public Response<String> suspendProcessAsync(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+        return Response.success(groupProcessOperation.suspendProcessAsync(groupId, operator));
+    }
+
+    @RequestMapping(value = "restartProcessAsync/{groupId}", method = RequestMethod.POST)
+    @ApiOperation(value = "Restart process")
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
+    public Response<String> restartProcessAsync(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+        return Response.success(groupProcessOperation.restartProcessAsync(groupId, operator));
+    }
+
+    @RequestMapping(value = "/deleteAsync/{groupId}", method = RequestMethod.DELETE)
+    @ApiOperation(value = "Delete inlong group information")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+    public Response<String> deleteAsync(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+        return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
+    }
+
     @RequestMapping(value = "getTopic/{groupId}", method = RequestMethod.GET)
     @ApiOperation(value = "Get Topic via the inlong group")
     public Response<InlongGroupTopicResponse> getTopic(@PathVariable String groupId) {