You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 16:26:51 UTC

[incubator-inlong] 01/01: [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 84fd9f171d68c39fc2b8238be17728f5ff728ab1
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jun 8 00:05:47 2022 +0800

    [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)
---
 .../manager/workflow/core/TransactionHelper.java   |  3 +-
 .../workflow/core/impl/ProcessorExecutorImpl.java  | 28 ---------------
 .../workflow/processor/ServiceTaskProcessor.java   | 41 +++++++++++++++-------
 3 files changed, 31 insertions(+), 41 deletions(-)

diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 704d2bc0b..2cf61e4ba 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -33,10 +33,11 @@ import org.springframework.util.Assert;
 import java.lang.reflect.UndeclaredThrowableException;
 
 /**
- * Transaction Helper
+ * Transaction Helper, now deprecated because we use @Transactional instead
  */
 @Slf4j
 @Service
+@Deprecated
 public class TransactionHelper {
 
     @Autowired
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index bf9c055f3..5366cf39c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -21,15 +21,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
-import org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
-import org.apache.inlong.manager.workflow.core.TransactionHelper;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.NextableElement;
 import org.apache.inlong.manager.workflow.definition.SkippableElement;
-import org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.processor.ElementProcessor;
 import org.apache.inlong.manager.workflow.processor.EndEventProcessor;
 import org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor;
@@ -38,8 +34,6 @@ import org.apache.inlong.manager.workflow.processor.StartEventProcessor;
 import org.apache.inlong.manager.workflow.processor.UserTaskProcessor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.support.TransactionCallback;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
@@ -53,8 +47,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
 
     private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends Element>> elementProcessor;
 
-    @Autowired
-    private TransactionHelper transactionHelper;
     @Autowired
     private StartEventProcessor startEventProcessor;
     @Autowired
@@ -101,15 +93,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
             return;
         }
 
-        // If it is a continuous task execution transaction isolation
-        if (element instanceof WorkflowTask) {
-            TransactionCallback<Object> callback = executeCompleteInTransaction(element, context);
-            if (callback != null) {
-                transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED);
-            }
-            return;
-        }
-
         executeComplete(element, context);
     }
 
@@ -159,15 +142,4 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
         }
     }
 
-    private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) {
-        try {
-            executeComplete(element, context);
-            return null;
-        } catch (WorkflowNoRollbackException e) { // Exception does not roll back
-            throw e;
-        } catch (Exception e) { // The exception is only rolled back once
-            throw new WorkflowRollbackOnceException(e.getMessage());
-        }
-    }
-
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index c779ba04d..73e58bd2b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.workflow.processor;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.TaskStatus;
 import org.apache.inlong.manager.common.exceptions.JsonException;
@@ -49,6 +50,7 @@ import java.util.Set;
  */
 @Service
 @NoArgsConstructor
+@Slf4j
 public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
 
     private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
@@ -77,27 +79,29 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
     public void create(ServiceTask serviceTask, WorkflowContext context) {
         WorkflowTaskEntity workflowTaskEntity = saveTaskEntity(serviceTask, context);
         context.getNewTaskList().add(workflowTaskEntity);
-        serviceTask.initListeners(context);
-        this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+        try {
+            serviceTask.initListeners(context);
+            this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+        } catch (Exception e) {
+            log.error("Create service task failed", e);
+            ActionContext actionContext = new WorkflowContext.ActionContext()
+                    .setTask((WorkflowTask) context.getCurrentElement())
+                    .setRemark("failed when create");
+            completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
+            this.taskEventNotifier.notify(TaskEvent.FAIL, context);
+            this.processEventNotifier.notify(ProcessEvent.FAIL, context);
+        }
     }
 
     @Override
     public boolean pendingForAction(WorkflowContext context) {
-        context.setActionContext(
-                new WorkflowContext.ActionContext()
-                        .setTask((WorkflowTask) context.getCurrentElement())
-                        .setAction(WorkflowAction.COMPLETE)
-                        .setTaskEntity(context.getNewTaskList().get(0))
-        );
-        context.getNewTaskList().clear();
         return false;
     }
 
     @Override
     public boolean complete(WorkflowContext context) {
+        resetActionContext(context);
         WorkflowContext.ActionContext actionContext = context.getActionContext();
-        Preconditions.checkTrue(SUPPORT_ACTIONS.contains(actionContext.getAction()),
-                "serviceTask not support action: " + actionContext.getAction());
         WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
         Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
                 "task status should allow complete");
@@ -107,6 +111,7 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
             completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.COMPLETED);
             return true;
         } catch (Exception e) {
+            log.error("Complete service task failed", e);
             completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
             this.taskEventNotifier.notify(TaskEvent.FAIL, context);
             this.processEventNotifier.notify(ProcessEvent.FAIL, context);
@@ -114,6 +119,16 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         }
     }
 
+    private void resetActionContext(WorkflowContext context) {
+        context.setActionContext(
+                new WorkflowContext.ActionContext()
+                        .setTask((WorkflowTask) context.getCurrentElement())
+                        .setAction(WorkflowAction.COMPLETE)
+                        .setTaskEntity(context.getNewTaskList().get(0))
+        );
+        context.getNewTaskList().clear();
+    }
+
     private WorkflowTaskEntity saveTaskEntity(ServiceTask serviceTask, WorkflowContext context) {
         WorkflowProcessEntity workflowProcessEntity = context.getProcessEntity();
         List<String> approvers = ApproverAssign.DEFAULT_SYSTEM_APPROVER.assign(context);
@@ -139,7 +154,9 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         taskEntity.setOperator(taskEntity.getApprovers());
         taskEntity.setRemark(actionContext.getRemark());
         try {
-            taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+            if (actionContext.getForm() != null) {
+                taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+            }
         } catch (Exception e) {
             throw new JsonException("write form to json error: ", e);
         }