You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/16 06:24:52 UTC

[incubator-inlong] branch master updated: [INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b8bec8221 [INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)
b8bec8221 is described below

commit b8bec82219f444e62095c16875981f498457cb96
Author: healchow <he...@gmail.com>
AuthorDate: Mon May 16 14:24:47 2022 +0800

    [INLONG-4212][Manager] The processor executor maybe throws a null pointer exception (#4213)
---
 .../workflow/core/impl/ProcessorExecutorImpl.java  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index f4e1aab1b..694044946 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -103,8 +103,10 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
 
         // If it is a continuous task execution transaction isolation
         if (element instanceof WorkflowTask) {
-            transactionHelper.execute(executeCompleteInTransaction(element, context),
-                    TransactionDefinition.PROPAGATION_NESTED);
+            TransactionCallback<Object> callback = executeCompleteInTransaction(element, context);
+            if (callback != null) {
+                transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED);
+            }
             return;
         }
 
@@ -120,7 +122,9 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
             return;
         }
         List<Element> nextElements = processor.next(element, context);
-        nextElements.forEach(next -> executeStart(next, context));
+        for (Element next : nextElements) {
+            executeStart(next, context);
+        }
     }
 
     private boolean isSkipCurrentElement(Element element, WorkflowContext context) {
@@ -150,20 +154,20 @@ public class ProcessorExecutorImpl implements ProcessorExecutor {
         // Execute next
         context.getActionContext().setAction(((NextableElement) element).defaultNextAction());
         List<Element> nextElements = processor.next(element, context);
-        nextElements.forEach(next -> executeStart(next, context));
+        for (Element next : nextElements) {
+            executeStart(next, context);
+        }
     }
 
     private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) {
-        return s -> {
-            try {
-                executeComplete(element, context);
-                return null;
-            } catch (WorkflowNoRollbackException e) { // Exception does not roll back
-                throw e;
-            } catch (Exception e) { // The exception is only rolled back once
-                throw new WorkflowRollbackOnceException(e.getMessage());
-            }
-        };
+        try {
+            executeComplete(element, context);
+            return null;
+        } catch (WorkflowNoRollbackException e) { // Exception does not roll back
+            throw e;
+        } catch (Exception e) { // The exception is only rolled back once
+            throw new WorkflowRollbackOnceException(e.getMessage());
+        }
     }
 
 }