You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/11 05:13:57 UTC
[inlong] branch master updated: [INLONG-5452][Manager] Change creating resource after group approval to be asynchronous (#5453)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3c033c47d [INLONG-5452][Manager] Change creating resource after group approval to be asynchronous (#5453)
3c033c47d is described below
commit 3c033c47d64e9403c7d60cb444a8eed15e079c96
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Aug 11 13:13:51 2022 +0800
[INLONG-5452][Manager] Change creating resource after group approval to be asynchronous (#5453)
---
.../group/apply/ApproveApplyProcessListener.java | 7 ++++---
.../workflow/event/process/ProcessEventListener.java | 18 ++++++++++++++++++
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
index ea6412944..058dfb725 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
@@ -18,14 +18,14 @@
package org.apache.inlong.manager.service.listener.group.apply;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -69,8 +69,9 @@ public class ApproveApplyProcessListener implements ProcessEventListener {
String username = context.getOperator();
List<InlongStreamInfo> streamList = streamService.list(groupId);
processForm.setStreamInfos(streamList);
- workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
+ // may run for long time, make it async processing
+ EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm));
log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
index 38a62adf9..ead9875bc 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
@@ -18,9 +18,15 @@
package org.apache.inlong.manager.workflow.event.process;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.inlong.manager.workflow.event.EventListener;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
/**
* Process event listener
@@ -32,4 +38,16 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> {
*/
List<ProcessEventListener> EMPTY_LIST = Lists.newArrayList();
+ /**
+ * Async process common thread pool
+ */
+ ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ 20,
+ 40,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(),
+ new CallerRunsPolicy());
+
}