You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 16:26:51 UTC
[incubator-inlong] 01/01: [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 84fd9f171d68c39fc2b8238be17728f5ff728ab1
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jun 8 00:05:47 2022 +0800
[INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558)
---
.../manager/workflow/core/TransactionHelper.java | 3 +-
.../workflow/core/impl/ProcessorExecutorImpl.java | 28 ---------------
.../workflow/processor/ServiceTaskProcessor.java | 41 +++++++++++++++-------
3 files changed, 31 insertions(+), 41 deletions(-)
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 704d2bc0b..2cf61e4ba 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -33,10 +33,11 @@ import org.springframework.util.Assert;
import java.lang.reflect.UndeclaredThrowableException;
/**
- * Transaction Helper
+ * Transaction Helper, now deprecated because we use @Transactional instead
*/
@Slf4j
@Service
+@Deprecated
public class TransactionHelper {
@Autowired
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index bf9c055f3..5366cf39c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -21,15 +21,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
-import org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
-import org.apache.inlong.manager.workflow.core.TransactionHelper;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.NextableElement;
import org.apache.inlong.manager.workflow.definition.SkippableElement;
-import org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.processor.ElementProcessor;
import org.apache.inlong.manager.workflow.processor.EndEventProcessor;
import org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor;
@@ -38,8 +34,6 @@ import org.apache.inlong.manager.workflow.processor.StartEventProcessor;
import org.apache.inlong.manager.workflow.processor.UserTaskProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.support.TransactionCallback;
import javax.annotation.PostConstruct;
import java.util.List;
@@ -53,8 +47,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends Element>> elementProcessor;
- @Autowired
- private TransactionHelper transactionHelper;
@Autowired
private StartEventProcessor startEventProcessor;
@Autowired
@@ -101,15 +93,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
return;
}
- // If it is a continuous task execution transaction isolation
- if (element instanceof WorkflowTask) {
- TransactionCallback<Object> callback = executeCompleteInTransaction(element, context);
- if (callback != null) {
- transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED);
- }
- return;
- }
-
executeComplete(element, context);
}
@@ -159,15 +142,4 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
}
}
- private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) {
- try {
- executeComplete(element, context);
- return null;
- } catch (WorkflowNoRollbackException e) { // Exception does not roll back
- throw e;
- } catch (Exception e) { // The exception is only rolled back once
- throw new WorkflowRollbackOnceException(e.getMessage());
- }
- }
-
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index c779ba04d..73e58bd2b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.workflow.processor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.TaskStatus;
import org.apache.inlong.manager.common.exceptions.JsonException;
@@ -49,6 +50,7 @@ import java.util.Set;
*/
@Service
@NoArgsConstructor
+@Slf4j
public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
@@ -77,27 +79,29 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
public void create(ServiceTask serviceTask, WorkflowContext context) {
WorkflowTaskEntity workflowTaskEntity = saveTaskEntity(serviceTask, context);
context.getNewTaskList().add(workflowTaskEntity);
- serviceTask.initListeners(context);
- this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+ try {
+ serviceTask.initListeners(context);
+ this.taskEventNotifier.notify(TaskEvent.CREATE, context);
+ } catch (Exception e) {
+ log.error("Create service task failed", e);
+ ActionContext actionContext = new WorkflowContext.ActionContext()
+ .setTask((WorkflowTask) context.getCurrentElement())
+ .setRemark("failed when create");
+ completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
+ this.taskEventNotifier.notify(TaskEvent.FAIL, context);
+ this.processEventNotifier.notify(ProcessEvent.FAIL, context);
+ }
}
@Override
public boolean pendingForAction(WorkflowContext context) {
- context.setActionContext(
- new WorkflowContext.ActionContext()
- .setTask((WorkflowTask) context.getCurrentElement())
- .setAction(WorkflowAction.COMPLETE)
- .setTaskEntity(context.getNewTaskList().get(0))
- );
- context.getNewTaskList().clear();
return false;
}
@Override
public boolean complete(WorkflowContext context) {
+ resetActionContext(context);
WorkflowContext.ActionContext actionContext = context.getActionContext();
- Preconditions.checkTrue(SUPPORT_ACTIONS.contains(actionContext.getAction()),
- "serviceTask not support action: " + actionContext.getAction());
WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
"task status should allow complete");
@@ -107,6 +111,7 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.COMPLETED);
return true;
} catch (Exception e) {
+ log.error("Complete service task failed", e);
completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED);
this.taskEventNotifier.notify(TaskEvent.FAIL, context);
this.processEventNotifier.notify(ProcessEvent.FAIL, context);
@@ -114,6 +119,16 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
}
}
+ private void resetActionContext(WorkflowContext context) {
+ context.setActionContext(
+ new WorkflowContext.ActionContext()
+ .setTask((WorkflowTask) context.getCurrentElement())
+ .setAction(WorkflowAction.COMPLETE)
+ .setTaskEntity(context.getNewTaskList().get(0))
+ );
+ context.getNewTaskList().clear();
+ }
+
private WorkflowTaskEntity saveTaskEntity(ServiceTask serviceTask, WorkflowContext context) {
WorkflowProcessEntity workflowProcessEntity = context.getProcessEntity();
List<String> approvers = ApproverAssign.DEFAULT_SYSTEM_APPROVER.assign(context);
@@ -139,7 +154,9 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
taskEntity.setOperator(taskEntity.getApprovers());
taskEntity.setRemark(actionContext.getRemark());
try {
- taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+ if (actionContext.getForm() != null) {
+ taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+ }
} catch (Exception e) {
throw new JsonException("write form to json error: ", e);
}