You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/23 04:31:17 UTC

[incubator-inlong] branch master updated: [INLONG-2662][Manager] Fix bug to avoid reinit of service task (#2664)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c01266d  [INLONG-2662][Manager] Fix bug to avoid reinit of service task (#2664)
c01266d is described below

commit c01266d8f1abfada5d4b7ba058d1f94410f34096
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Feb 23 12:31:12 2022 +0800

    [INLONG-2662][Manager] Fix bug to avoid reinit of service task (#2664)
---
 .../service/workflow/WorkflowServiceImplTest.java       |  2 ++
 .../inlong/manager/workflow/definition/ServiceTask.java | 17 +++++++++++++----
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index d0e69d0..54bf64c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -64,11 +64,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Collections;
 import java.util.List;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@EnableAutoConfiguration
 public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     public static final String OPERATOR = "admin";
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index e401fe1..482320a 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -19,6 +19,8 @@ package org.apache.inlong.manager.workflow.definition;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.WorkflowAction;
@@ -33,6 +35,7 @@ import java.util.Set;
 /**
  * System task
  */
+@Slf4j
 public class ServiceTask extends WorkflowTask {
 
     private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
@@ -42,6 +45,8 @@ public class ServiceTask extends WorkflowTask {
 
     private ServiceTaskType serviceTaskType;
 
+    private AtomicBoolean isInit = new AtomicBoolean(false);
+
     @Override
     public WorkflowAction defaultNextAction() {
         return WorkflowAction.COMPLETE;
@@ -90,11 +95,15 @@ public class ServiceTask extends WorkflowTask {
     }
 
     public void initListeners(WorkflowContext workflowContext) {
-        if (listenerProvider == null || serviceTaskType == null) {
-            return;
+        if (isInit.compareAndSet(false, true)) {
+            if (listenerProvider == null || serviceTaskType == null) {
+                return;
+            }
+            Iterable<TaskEventListener> listeners = listenerProvider.get(workflowContext, serviceTaskType);
+            addListeners(Lists.newArrayList(listeners));
+        } else {
+            log.debug("ServiceTask:{} is already init", getName());
         }
-        Iterable<TaskEventListener> listeners = listenerProvider.get(workflowContext, serviceTaskType);
-        addListeners(Lists.newArrayList(listeners));
     }
 
 }