You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 16:26:50 UTC

[incubator-inlong] branch master updated (43e5b91ba -> 84fd9f171)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


    omit 43e5b91ba [INLONG-4556][Manager] Optimize ProcessorExecutor Logic (#4558)
     new 84fd9f171 [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (43e5b91ba)
            \
             N -- N -- N   refs/heads/master (84fd9f171)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-inlong] 01/01: [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 84fd9f171d68c39fc2b8238be17728f5ff728ab1
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jun 8 00:05:47 2022 +0800

    [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)
---
 .../manager/workflow/core/TransactionHelper.java   |  3 +-
 .../workflow/core/impl/ProcessorExecutorImpl.java  | 28 ---------------
 .../workflow/processor/ServiceTaskProcessor.java   | 41 +++++++++++++++-------
 3 files changed, 31 insertions(+), 41 deletions(-)

diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 704d2bc0b..2cf61e4ba 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -33,10 +33,11 @@ import org.springframework.util.Assert;
 import java.lang.reflect.UndeclaredThrowableException;
 
 /**
- * Transaction Helper
+ * Transaction Helper, now deprecated because we use @Transactional instead
  */
 @Slf4j
 @Service
+@Deprecated
 public class TransactionHelper {
 
     @Autowired
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index bf9c055f3..5366cf39c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -21,15 +21,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
-import org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
-import org.apache.inlong.manager.workflow.core.TransactionHelper;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.NextableElement;
 import org.apache.inlong.manager.workflow.definition.SkippableElement;
-import org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.processor.ElementProcessor;
 import org.apache.inlong.manager.workflow.processor.EndEventProcessor;
 import org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor;
@@ -38,8 +34,6 @@ import org.apache.inlong.manager.workflow.processor.StartEventProcessor;
 import org.apache.inlong.manager.workflow.processor.UserTaskProcessor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.support.TransactionCallback;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
@@ -53,8 +47,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
 
     private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends Element>> elementProcessor;
 
-    @Autowired
-    private TransactionHelper transactionHelper;
     @Autowired
     private StartEventProcessor startEventProcessor;
     @Autowired
@@ -101,15 +93,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
             return;
         }
 
-        // If it is a continuous task execution transaction isolation
-        if (element instanceof WorkflowTask) {
-            TransactionCallback<Object> callback = executeCompleteInTransaction(element, context);
-            if (callback != null) {
-                transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED);
-            }
-            return;
-        }
-
         executeComplete(element, context);
     }
 
@@ -159,15 +142,4 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
         }
     }
 
-    private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) {
-        try {
-            executeComplete(element, context);
-            return null;
-        } catch (WorkflowNoRollbackException e) { // Exception does not roll back
-            throw e;
-        } catch (Exception e) { // The exception is only rolled back once
-            throw new WorkflowRollbackOnceException(e.getMessage());
-        }
-    }
-
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index c779ba04d..73e58bd2b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.workflow.processor;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.TaskStatus;
 import org.apache.inlong.manager.common.exceptions.JsonException;
@@ -49,6 +50,7 @@ import java.util.Set;
  */
 @Service
 @NoArgsConstructor
+@Slf4j
 public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
 
     private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
@@ -77,27 +79,29 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
     public void create(ServiceTask serviceTask, WorkflowContext context) {
         WorkflowTaskEntity workflowTaskEntity = saveTaskEntity(serviceTask, context);
         context.getNewTaskList().add(workflowTaskEntity);
-        serviceTask.initListeners(context);
-        this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+        try {
+            serviceTask.initListeners(context);
+            this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+        } catch (Exception e) {
+            log.error("Create service task failed", e);
+            ActionContext actionContext = new WorkflowContext.ActionContext()
+                    .setTask((WorkflowTask) context.getCurrentElement())
+                    .setRemark("failed when create");
+            completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
+            this.taskEventNotifier.notify(TaskEvent.FAIL, context);
+            this.processEventNotifier.notify(ProcessEvent.FAIL, context);
+        }
     }
 
     @Override
     public boolean pendingForAction(WorkflowContext context) {
-        context.setActionContext(
-                new WorkflowContext.ActionContext()
-                        .setTask((WorkflowTask) context.getCurrentElement())
-                        .setAction(WorkflowAction.COMPLETE)
-                        .setTaskEntity(context.getNewTaskList().get(0))
-        );
-        context.getNewTaskList().clear();
         return false;
     }
 
     @Override
     public boolean complete(WorkflowContext context) {
+        resetActionContext(context);
         WorkflowContext.ActionContext actionContext = context.getActionContext();
-        Preconditions.checkTrue(SUPPORT_ACTIONS.contains(actionContext.getAction()),
-                "serviceTask not support action: " + actionContext.getAction());
         WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
         Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
                 "task status should allow complete");
@@ -107,6 +111,7 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
             completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.COMPLETED);
             return true;
         } catch (Exception e) {
+            log.error("Complete service task failed", e);
             completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
             this.taskEventNotifier.notify(TaskEvent.FAIL, context);
             this.processEventNotifier.notify(ProcessEvent.FAIL, context);
@@ -114,6 +119,16 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         }
     }
 
+    private void resetActionContext(WorkflowContext context) {
+        context.setActionContext(
+                new WorkflowContext.ActionContext()
+                        .setTask((WorkflowTask) context.getCurrentElement())
+                        .setAction(WorkflowAction.COMPLETE)
+                        .setTaskEntity(context.getNewTaskList().get(0))
+        );
+        context.getNewTaskList().clear();
+    }
+
     private WorkflowTaskEntity saveTaskEntity(ServiceTask serviceTask, WorkflowContext context) {
         WorkflowProcessEntity workflowProcessEntity = context.getProcessEntity();
         List<String> approvers = ApproverAssign.DEFAULT_SYSTEM_APPROVER.assign(context);
@@ -139,7 +154,9 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         taskEntity.setOperator(taskEntity.getApprovers());
         taskEntity.setRemark(actionContext.getRemark());
         try {
-            taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+            if (actionContext.getForm() != null) {
+                taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+            }
         } catch (Exception e) {
             throw new JsonException("write form to json error: ", e);
         }