You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/07/08 14:24:23 UTC
[camel] branch camel-3.11.x updated: CAMEL-16794: race condition in
LoopProcessor (#5813)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.11.x by this push:
new d8958ee CAMEL-16794: race condition in LoopProcessor (#5813)
d8958ee is described below
commit d8958ee8f34936c834e6df2e7d3e380922f8401a
Author: Sergio Penkale <38...@users.noreply.github.com>
AuthorDate: Thu Jul 8 15:22:28 2021 +0100
CAMEL-16794: race condition in LoopProcessor (#5813)
* CAMEL-16794: race condition in LoopProcessor
* CAMEL-16794: avoid synchronized method
Co-authored-by: Sergio Penkale <se...@lingo24.com>
---
.../main/java/org/apache/camel/processor/LoopProcessor.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index e70f188..435ed9d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.atomic.LongAdder;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -47,7 +49,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
private String id;
private String routeId;
- private volatile LoopState state;
private boolean shutdownPending;
private final CamelContext camelContext;
private final ReactiveExecutor reactiveExecutor;
@@ -55,6 +56,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
private final Predicate predicate;
private final boolean copy;
private final boolean breakOnShutdown;
+ private final LongAdder taskCount = new LongAdder();
public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate,
boolean copy, boolean breakOnShutdown) {
@@ -70,7 +72,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- state = new LoopState(exchange, callback);
+ LoopState state = new LoopState(exchange, callback);
if (exchange.isTransacted()) {
reactiveExecutor.scheduleSync(state);
@@ -92,7 +94,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
@Override
public int getPendingExchangesSize() {
- return state != null ? state.getPendingSize() : 0;
+ return taskCount.intValue();
}
@Override
@@ -122,6 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
// but evaluation result is a textual representation of a numeric value.
String text = expression.evaluate(exchange, String.class);
count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+ taskCount.add(count);
exchange.setProperty(ExchangePropertyKey.LOOP_SIZE, count);
}
}
@@ -147,6 +150,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
processor.process(current, doneSync -> {
// increment counter after done
index++;
+ taskCount.decrement();
reactiveExecutor.schedule(this);
});
} else {