You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/12 16:31:25 UTC
[camel] branch camel-3.21.x updated: CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push:
new c8ad7dc84c3 CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception
c8ad7dc84c3 is described below
commit c8ad7dc84c3c2faf2ef6f18539e56a5346facfbf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Aug 12 18:30:30 2023 +0200
CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception
---
.../java/org/apache/camel/processor/LoopProcessor.java | 14 ++++++++++++++
.../java/org/apache/camel/processor/LoopExceptionTest.java | 1 +
2 files changed, 15 insertions(+)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index f4709f8dfdc..33f9b7f4930 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -163,12 +163,26 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
}
+ if (!cont && expression != null) {
+ // if we should stop due to an exception etc, then make sure to dec task count
+ int gap = count - index;
+ while (gap-- > 0) {
+ taskCount.decrement();
+ }
+ }
callback.done(false);
}
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
}
+ if (expression != null) {
+ // if we should stop due to an exception etc, then make sure to dec task count
+ int gap = count - index;
+ while (gap-- > 0) {
+ taskCount.decrement();
+ }
+ }
exchange.setException(e);
callback.done(false);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
index 976ad39f8e8..1c7795cadc6 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
@@ -26,6 +26,7 @@ public class LoopExceptionTest extends ContextTestSupport {
public void testLoopException() throws Exception {
getMockEndpoint("mock:dead").expectedMessageCount(1);
getMockEndpoint("mock:loop").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
template.sendBody("direct:start", "Hello World");