You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/16 06:24:52 UTC
[incubator-inlong] branch master updated: [INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b8bec8221 [INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)
b8bec8221 is described below
commit b8bec82219f444e62095c16875981f498457cb96
Author: healchow <he...@gmail.com>
AuthorDate: Mon May 16 14:24:47 2022 +0800
[INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)
---
.../workflow/core/impl/ProcessorExecutorImpl.java | 32 ++++++++++++----------
1 file changed, 18 insertions(+), 14 deletions(-)
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index f4e1aab1b..694044946 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -103,8 +103,10 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
// If it is a continuous task execution transaction isolation
if (element instanceof WorkflowTask) {
- transactionHelper.execute(executeCompleteInTransaction(element, context),
- TransactionDefinition.PROPAGATION_NESTED);
+ TransactionCallback<Object> callback = executeCompleteInTransaction(element, context);
+ if (callback != null) {
+ transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED);
+ }
return;
}
@@ -120,7 +122,9 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
return;
}
List<Element> nextElements = processor.next(element, context);
- nextElements.forEach(next -> executeStart(next, context));
+ for (Element next : nextElements) {
+ executeStart(next, context);
+ }
}
private boolean isSkipCurrentElement(Element element, WorkflowContext context) {
@@ -150,20 +154,20 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
// Execute next
context.getActionContext().setAction(((NextableElement) element).defaultNextAction());
List<Element> nextElements = processor.next(element, context);
- nextElements.forEach(next -> executeStart(next, context));
+ for (Element next : nextElements) {
+ executeStart(next, context);
+ }
}
private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) {
- return s -> {
- try {
- executeComplete(element, context);
- return null;
- } catch (WorkflowNoRollbackException e) { // Exception does not roll back
- throw e;
- } catch (Exception e) { // The exception is only rolled back once
- throw new WorkflowRollbackOnceException(e.getMessage());
- }
- };
+ try {
+ executeComplete(element, context);
+ return null;
+ } catch (WorkflowNoRollbackException e) { // Exception does not roll back
+ throw e;
+ } catch (Exception e) { // The exception is only rolled back once
+ throw new WorkflowRollbackOnceException(e.getMessage());
+ }
}
}