You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/11 05:13:57 UTC

[inlong] branch master updated: [INLONG-5452][Manager] Change creating resource after group approval to be asynchronous (#5453)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c033c47d [INLONG-5452][Manager] Change creating resource after group approval  to be asynchronous (#5453)
3c033c47d is described below

commit 3c033c47d64e9403c7d60cb444a8eed15e079c96
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Aug 11 13:13:51 2022 +0800

    [INLONG-5452][Manager] Change creating resource after group approval  to be asynchronous (#5453)
---
 .../group/apply/ApproveApplyProcessListener.java       |  7 ++++---
 .../workflow/event/process/ProcessEventListener.java   | 18 ++++++++++++++++++
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
index ea6412944..058dfb725 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
@@ -18,14 +18,14 @@
 package org.apache.inlong.manager.service.listener.group.apply;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -69,8 +69,9 @@ public class ApproveApplyProcessListener implements ProcessEventListener {
         String username = context.getOperator();
         List<InlongStreamInfo> streamList = streamService.list(groupId);
         processForm.setStreamInfos(streamList);
-        workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
 
+        // may run for long time, make it async processing
+        EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm));
         log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId);
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
index 38a62adf9..ead9875bc 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
@@ -18,9 +18,15 @@
 package org.apache.inlong.manager.workflow.event.process;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.inlong.manager.workflow.event.EventListener;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Process event listener
@@ -32,4 +38,16 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> {
      */
     List<ProcessEventListener> EMPTY_LIST = Lists.newArrayList();
 
+    /**
+     * Async process common thread pool
+     */
+    ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            20,
+            40,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(),
+            new CallerRunsPolicy());
+
 }