You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/15 12:07:43 UTC
[incubator-inlong] branch master updated: [INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete (#3150)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 11804b5 [INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete (#3150)
11804b5 is described below
commit 11804b513b4b9e9fa5e49b2804a0aa62f753f48e
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue Mar 15 20:07:01 2022 +0800
[INLONG-3149][Manager] Support async method for Inlong group stop/restart/delete (#3150)
---
.../inlong/manager/client/api/InlongGroup.java | 23 +++++++-
.../manager/client/api/impl/BlankInlongGroup.java | 15 +++++
.../manager/client/api/impl/InlongGroupImpl.java | 21 ++++++-
.../client/api/inner/InnerInlongManagerClient.java | 27 ++++++++-
.../core/impl/InlongGroupProcessOperation.java | 66 +++++++++++++++++++++-
.../web/controller/InlongGroupController.java | 43 +++++++++++---
6 files changed, 176 insertions(+), 19 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 652a2be..94f94c7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -32,7 +32,6 @@ public interface InlongGroup {
* Create snapshot for Inlong group
*
* @return
- *
* @throws Exception
*/
InlongGroupContext context() throws Exception;
@@ -50,7 +49,6 @@ public interface InlongGroup {
* Update Inlong group on updated conf
*
* @return
- *
* @throws Exception
*/
void update(InlongGroupConf conf) throws Exception;
@@ -71,6 +69,13 @@ public interface InlongGroup {
InlongGroupContext suspend() throws Exception;
/**
+ * Suspend the stream group and return group info.
+ *
+ * @return group info
+ */
+ InlongGroupContext suspend(boolean async) throws Exception;
+
+ /**
* Restart the stream group and return group info.
*
* @return group info
@@ -78,6 +83,13 @@ public interface InlongGroup {
InlongGroupContext restart() throws Exception;
/**
+ * Restart the stream group and return group info.
+ *
+ * @return group info
+ */
+ InlongGroupContext restart(boolean async) throws Exception;
+
+ /**
* delete the stream group and return group info
*
* @return group info
@@ -85,6 +97,13 @@ public interface InlongGroup {
InlongGroupContext delete() throws Exception;
/**
+ * delete the stream group and return group info
+ *
+ * @return group info
+ */
+ InlongGroupContext delete(boolean async) throws Exception;
+
+ /**
* List all inlong streams in certain group
*
* @return inlong stream contained in this group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index 37c62cb..d94521f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -59,16 +59,31 @@ public class BlankInlongGroup implements InlongGroup {
}
@Override
+ public InlongGroupContext suspend(boolean async) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
public InlongGroupContext restart() throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}
@Override
+ public InlongGroupContext restart(boolean async) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
public InlongGroupContext delete() throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}
@Override
+ public InlongGroupContext delete(boolean async) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
public List<InlongStream> listStreams() throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8f96a07..17fc05c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -150,31 +150,46 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext suspend() throws Exception {
+ return suspend(false);
+ }
+
+ @Override
+ public InlongGroupContext suspend(boolean async) throws Exception {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
Pair<String, String> idAndErr = managerClient.updateGroup(groupInfo.genRequest());
final String errMsg = idAndErr.getValue();
final String groupId = idAndErr.getKey();
AssertUtil.isNull(errMsg, errMsg);
- managerClient.operateInlongGroup(groupId, InlongGroupState.STOPPED);
+ managerClient.operateInlongGroup(groupId, InlongGroupState.STOPPED, async);
return generateSnapshot();
}
@Override
public InlongGroupContext restart() throws Exception {
+ return restart(false);
+ }
+
+ @Override
+ public InlongGroupContext restart(boolean async) throws Exception {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
Pair<String, String> idAndErr = managerClient.updateGroup(groupInfo.genRequest());
final String errMsg = idAndErr.getValue();
final String groupId = idAndErr.getKey();
AssertUtil.isNull(errMsg, errMsg);
- managerClient.operateInlongGroup(groupId, InlongGroupState.STARTED);
+ managerClient.operateInlongGroup(groupId, InlongGroupState.STARTED, async);
return generateSnapshot();
}
@Override
public InlongGroupContext delete() throws Exception {
+ return delete(false);
+ }
+
+ @Override
+ public InlongGroupContext delete(boolean async) throws Exception {
InlongGroupResponse groupResponse = managerClient.getGroupInfo(
groupContext.getGroupId());
- boolean isDeleted = managerClient.deleteInlongGroup(groupResponse.getInlongGroupId());
+ boolean isDeleted = managerClient.deleteInlongGroup(groupResponse.getInlongGroupId(), async);
if (isDeleted) {
groupResponse.setStatus(GroupState.DELETED.getCode());
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 8c8c4d7..8db3ca1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -567,11 +567,23 @@ public class InnerInlongManagerClient {
}
public boolean operateInlongGroup(String groupId, InlongGroupState status) {
+ return operateInlongGroup(groupId, status, false);
+ }
+
+ public boolean operateInlongGroup(String groupId, InlongGroupState status, boolean async) {
String path = HTTP_PATH;
if (status == InlongGroupState.STOPPED) {
- path += "/group/suspendProcess/";
+ if (async) {
+ path += "/group/suspendProcessAsync/";
+ } else {
+ path += "/group/suspendProcess/";
+ }
} else if (status == InlongGroupState.STARTED) {
- path += "/group/restartProcess/";
+ if (async) {
+ path += "/group/restartProcessAsync/";
+ } else {
+ path += "/group/restartProcess/";
+ }
} else {
throw new IllegalArgumentException(String.format("Unsupported state: %s", status));
}
@@ -599,7 +611,16 @@ public class InnerInlongManagerClient {
}
public boolean deleteInlongGroup(String groupId) {
- final String path = HTTP_PATH + "/group/delete/" + groupId;
+ return deleteInlongGroup(groupId, false);
+ }
+
+ public boolean deleteInlongGroup(String groupId, boolean async) {
+ String path = HTTP_PATH;
+ if (async) {
+ path += "/group/deleteAsync/" + groupId;
+ } else {
+ path += "/group/delete/" + groupId;
+ }
final String url = formatUrl(path);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
Request request = new Request.Builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 6e9a41e..7ce1040 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.core.impl;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -38,6 +39,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
/**
* Operation related to inlong group process
@@ -46,13 +52,22 @@ import java.util.List;
public class InlongGroupProcessOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessOperation.class);
+
+ private final ExecutorService executorService = new ThreadPoolExecutor(
+ 20,
+ 40,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(),
+ new CallerRunsPolicy());
+
@Autowired
private InlongGroupService groupService;
@Autowired
private WorkflowService workflowService;
@Autowired
private InlongStreamService streamService;
-
@Autowired
private InlongStreamEntityMapper streamMapper;
@@ -72,6 +87,21 @@ public class InlongGroupProcessOperation {
}
/**
+ * Suspend resource application group in an asynchronous way,
+ * stop source and sort task related to application group asynchronously,
+ * persist the application status if necessary.
+ *
+ * @return groupId
+ */
+ public String suspendProcessAsync(String groupId, String operator) {
+ LOGGER.info("begin to suspend process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ groupService.updateStatus(groupId, GroupState.SUSPENDING.getCode(), operator);
+ UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.SUSPEND);
+ executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+ return groupId;
+ }
+
+ /**
* Suspend resource application group which is started up successfully,
* stop source and sort task related to application group asynchronously,
* persist the application status if necessary.
@@ -86,6 +116,20 @@ public class InlongGroupProcessOperation {
}
/**
+ * Restart resource application group in an asynchronous way,
+ * starting from the last persist snapshot.
+ *
+ * @return Workflow result
+ */
+ public String restartProcessAsync(String groupId, String operator) {
+ LOGGER.info("begin to restart process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ groupService.updateStatus(groupId, GroupState.RESTARTING.getCode(), operator);
+ UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.RESTART);
+ executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+ return groupId;
+ }
+
+ /**
* Restart resource application group which is suspended successfully,
* starting from the last persist snapshot.
*
@@ -99,7 +143,25 @@ public class InlongGroupProcessOperation {
}
/**
- * Delete resource application group logically and delete related resource
+ * Delete resource application group logically and delete related resource in an
+ */
+ public String deleteProcessAsync(String groupId, String operator) {
+ LOGGER.info("begin to delete process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ executorService.execute(() -> {
+ try {
+ UpdateGroupProcessForm form = genUpdateGroupProcessForm(groupId, OperateType.DELETE);
+ workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
+ } catch (Exception ex) {
+ LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+ throw ex;
+ }
+ groupService.delete(groupId, operator);
+ });
+ return groupId;
+ }
+
+ /**
+ * Delete resource application group logically and delete related resource in an asynchronous way
*/
public boolean deleteProcess(String groupId, String operator) {
LOGGER.info("begin to delete process, groupId = {}, operator = {}", groupId, operator);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 8036553..ff3176d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -84,15 +84,6 @@ public class InlongGroupController {
return Response.success(groupService.update(groupInfo, operator));
}
- @RequestMapping(value = "/delete/{groupId}", method = RequestMethod.DELETE)
- @ApiOperation(value = "Delete inlong group information")
- @OperationLog(operation = OperationType.DELETE)
- @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
- public Response<Boolean> delete(@PathVariable String groupId) {
- String operator = LoginUserUtils.getLoginUserDetail().getUserName();
- return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
- }
-
@RequestMapping(value = "/exist/{groupId}", method = RequestMethod.GET)
@ApiOperation(value = "Query whether the inlong group id exists")
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
@@ -131,6 +122,40 @@ public class InlongGroupController {
return Response.success(groupProcessOperation.restartProcess(groupId, operator));
}
+ @RequestMapping(value = "/delete/{groupId}", method = RequestMethod.DELETE)
+ @ApiOperation(value = "Delete inlong group information")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+ public Response<Boolean> delete(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+ return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
+ }
+
+ @RequestMapping(value = "suspendProcessAsync/{groupId}", method = RequestMethod.POST)
+ @ApiOperation(value = "Suspend process")
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
+ public Response<String> suspendProcessAsync(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+ return Response.success(groupProcessOperation.suspendProcessAsync(groupId, operator));
+ }
+
+ @RequestMapping(value = "restartProcessAsync/{groupId}", method = RequestMethod.POST)
+ @ApiOperation(value = "Restart process")
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
+ public Response<String> restartProcessAsync(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+ return Response.success(groupProcessOperation.restartProcessAsync(groupId, operator));
+ }
+
+ @RequestMapping(value = "/deleteAsync/{groupId}", method = RequestMethod.DELETE)
+ @ApiOperation(value = "Delete inlong group information")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+ public Response<String> deleteAsync(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUserDetail().getUserName();
+ return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
+ }
+
@RequestMapping(value = "getTopic/{groupId}", method = RequestMethod.GET)
@ApiOperation(value = "Get Topic via the inlong group")
public Response<InlongGroupTopicResponse> getTopic(@PathVariable String groupId) {