You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 11:56:37 UTC
[incubator-inlong] branch master updated: [INLONG-701] Update
create resource workflow definition
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd8c60 [INLONG-701] Update create resource workflow definition
4fd8c60 is described below
commit 4fd8c6032f4632d98aeda806096e53c4f11b388d
Author: ireliasheng <ir...@tencent.com>
AuthorDate: Fri Jul 9 19:27:49 2021 +0800
[INLONG-701] Update create resource workflow definition
---
.../inlong/manager/common/beans/TryBean.java | 2 +-
.../CreateTubeConsumeGroupTaskEventListener.java | 2 +-
.../CreateResourceWorkflowDefinition.java | 23 +++++++++++++++++++++-
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
index d9606c4..8bcee60 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
@@ -31,7 +31,7 @@ public class TryBean {
/**
* The first delay time, in milliseconds
*/
- private Long delay = 3000L;
+ private Long delay = 30000L;
/**
* The max delay time, in milliseconds
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
index 31af6ee..3844764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
@@ -116,6 +116,6 @@ public class CreateTubeConsumeGroupTaskEventListener implements TaskEventListene
@Override
public boolean async() {
- return false;
+ return true;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
index 011f3e9..5085df6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.dao.entity.DataStreamEntity;
import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
import org.apache.inlong.manager.service.core.StorageService;
import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForAllStreamListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeConsumeGroupTaskEventListener;
import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskEventListener;
import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigToSortEventListener;
import org.apache.inlong.manager.service.workflow.ProcessName;
@@ -61,6 +62,9 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
private CreateTubeTopicTaskEventListener createTubeTopicTaskEventListener;
@Autowired
+ private CreateTubeConsumeGroupTaskEventListener createTubeConsumeGroupTaskEventListener;
+
+ @Autowired
private CreateHiveTableForAllStreamListener createHiveTableForAllStreamListener;
@Autowired
@@ -112,6 +116,22 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
createTubeTopicTask.addListener(createTubeTopicTaskEventListener);
process.addTask(createTubeTopicTask);
+ ServiceTask createTubeConsumerGroupTask = new ServiceTask();
+ createTubeConsumerGroupTask.setSkipResolver(c -> {
+ CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
+ BusinessInfo businessInfo = form.getBusinessInfo();
+ if (BizConstant.MIDDLEWARE_TYPE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+ return false;
+ }
+ log.warn("no need to create tube resource for bid={}", form.getBusinessId());
+ return true;
+ });
+ createTubeConsumerGroupTask.setName("createTubeSortConsumerGroup");
+ createTubeConsumerGroupTask.setDisplayName("Create Tube Consumer Group");
+ createTubeConsumerGroupTask.addListener(createTubeConsumeGroupTaskEventListener);
+ process.addTask(createTubeConsumerGroupTask);
+
+
ServiceTask createHiveTablesTask = new ServiceTask();
createHiveTablesTask.setSkipResolver(c -> {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
@@ -141,7 +161,8 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
process.addTask(pushSortConfig);
startEvent.addNext(createTubeTopicTask);
- createTubeTopicTask.addNext(createHiveTablesTask);
+ createTubeTopicTask.addNext(createTubeConsumerGroupTask);
+ createTubeConsumerGroupTask.addNext(createHiveTablesTask);
createHiveTablesTask.addNext(pushSortConfig);
pushSortConfig.addNext(endEvent);