You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 11:56:37 UTC

[incubator-inlong] branch master updated: [INLONG-701] Update create resource workflow definition

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd8c60  [INLONG-701] Update create resource workflow definition
4fd8c60 is described below

commit 4fd8c6032f4632d98aeda806096e53c4f11b388d
Author: ireliasheng <ir...@tencent.com>
AuthorDate: Fri Jul 9 19:27:49 2021 +0800

    [INLONG-701] Update create resource workflow definition
---
 .../inlong/manager/common/beans/TryBean.java       |  2 +-
 .../CreateTubeConsumeGroupTaskEventListener.java   |  2 +-
 .../CreateResourceWorkflowDefinition.java          | 23 +++++++++++++++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
index d9606c4..8bcee60 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
@@ -31,7 +31,7 @@ public class TryBean {
     /**
      * The first delay time, in milliseconds
      */
-    private Long delay = 3000L;
+    private Long delay = 30000L;
     /**
      * The max delay time, in milliseconds
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
index 31af6ee..3844764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
@@ -116,6 +116,6 @@ public class CreateTubeConsumeGroupTaskEventListener implements TaskEventListene
 
     @Override
     public boolean async() {
-        return false;
+        return true;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
index 011f3e9..5085df6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.dao.entity.DataStreamEntity;
 import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
 import org.apache.inlong.manager.service.core.StorageService;
 import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForAllStreamListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeConsumeGroupTaskEventListener;
 import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskEventListener;
 import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigToSortEventListener;
 import org.apache.inlong.manager.service.workflow.ProcessName;
@@ -61,6 +62,9 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
     private CreateTubeTopicTaskEventListener createTubeTopicTaskEventListener;
 
     @Autowired
+    private CreateTubeConsumeGroupTaskEventListener createTubeConsumeGroupTaskEventListener;
+
+    @Autowired
     private CreateHiveTableForAllStreamListener createHiveTableForAllStreamListener;
 
     @Autowired
@@ -112,6 +116,22 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
         createTubeTopicTask.addListener(createTubeTopicTaskEventListener);
         process.addTask(createTubeTopicTask);
 
+        ServiceTask createTubeConsumerGroupTask = new ServiceTask();
+        createTubeConsumerGroupTask.setSkipResolver(c -> {
+            CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
+            BusinessInfo businessInfo = form.getBusinessInfo();
+            if (BizConstant.MIDDLEWARE_TYPE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+                return false;
+            }
+            log.warn("no need to create tube resource for bid={}", form.getBusinessId());
+            return true;
+        });
+        createTubeConsumerGroupTask.setName("createTubeSortConsumerGroup");
+        createTubeConsumerGroupTask.setDisplayName("Create Tube Consumer Group");
+        createTubeConsumerGroupTask.addListener(createTubeConsumeGroupTaskEventListener);
+        process.addTask(createTubeConsumerGroupTask);
+
+
         ServiceTask createHiveTablesTask = new ServiceTask();
         createHiveTablesTask.setSkipResolver(c -> {
             CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
@@ -141,7 +161,8 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
         process.addTask(pushSortConfig);
 
         startEvent.addNext(createTubeTopicTask);
-        createTubeTopicTask.addNext(createHiveTablesTask);
+        createTubeTopicTask.addNext(createTubeConsumerGroupTask);
+        createTubeConsumerGroupTask.addNext(createHiveTablesTask);
         createHiveTablesTask.addNext(pushSortConfig);
         pushSortConfig.addNext(endEvent);