You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/07/08 14:24:23 UTC

[camel] branch camel-3.11.x updated: CAMEL-16794: race condition in LoopProcessor (#5813)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.11.x by this push:
     new d8958ee  CAMEL-16794: race condition in LoopProcessor (#5813)
d8958ee is described below

commit d8958ee8f34936c834e6df2e7d3e380922f8401a
Author: Sergio Penkale <38...@users.noreply.github.com>
AuthorDate: Thu Jul 8 15:22:28 2021 +0100

    CAMEL-16794: race condition in LoopProcessor (#5813)
    
    * CAMEL-16794: race condition in LoopProcessor
    
    * CAMEL-16794: avoid synchronized method
    
    Co-authored-by: Sergio Penkale <se...@lingo24.com>
---
 .../main/java/org/apache/camel/processor/LoopProcessor.java    | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index e70f188..435ed9d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -47,7 +49,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
     private String id;
     private String routeId;
-    private volatile LoopState state;
     private boolean shutdownPending;
     private final CamelContext camelContext;
     private final ReactiveExecutor reactiveExecutor;
@@ -55,6 +56,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
     private final Predicate predicate;
     private final boolean copy;
     private final boolean breakOnShutdown;
+    private final LongAdder taskCount = new LongAdder();
 
     public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate,
                          boolean copy, boolean breakOnShutdown) {
@@ -70,7 +72,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            state = new LoopState(exchange, callback);
+            LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
                 reactiveExecutor.scheduleSync(state);
@@ -92,7 +94,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
     @Override
     public int getPendingExchangesSize() {
-        return state != null ? state.getPendingSize() : 0;
+        return taskCount.intValue();
     }
 
     @Override
@@ -122,6 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                 // but evaluation result is a textual representation of a numeric value.
                 String text = expression.evaluate(exchange, String.class);
                 count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+                taskCount.add(count);
                 exchange.setProperty(ExchangePropertyKey.LOOP_SIZE, count);
             }
         }
@@ -147,6 +150,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     processor.process(current, doneSync -> {
                         // increment counter after done
                         index++;
+                        taskCount.decrement();
                         reactiveExecutor.schedule(this);
                     });
                 } else {