You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/12 16:31:25 UTC

[camel] branch camel-3.21.x updated: CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new c8ad7dc84c3 CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception
c8ad7dc84c3 is described below

commit c8ad7dc84c3c2faf2ef6f18539e56a5346facfbf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Aug 12 18:30:30 2023 +0200

    CAMEL-19738: camel-core - Loop EIP can have wrong pending inflight in case of early loop exit due to exception
---
 .../java/org/apache/camel/processor/LoopProcessor.java     | 14 ++++++++++++++
 .../java/org/apache/camel/processor/LoopExceptionTest.java |  1 +
 2 files changed, 15 insertions(+)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index f4709f8dfdc..33f9b7f4930 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -163,12 +163,26 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
                     }
+                    if (!cont && expression != null) {
+                        // if we should stop due to an exception etc, then make sure to dec task count
+                        int gap = count - index;
+                        while (gap-- > 0) {
+                            taskCount.decrement();
+                        }
+                    }
                     callback.done(false);
                 }
             } catch (Exception e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
                 }
+                if (expression != null) {
+                    // if we should stop due to an exception etc, then make sure to dec task count
+                    int gap = count - index;
+                    while (gap-- > 0) {
+                        taskCount.decrement();
+                    }
+                }
                 exchange.setException(e);
                 callback.done(false);
             }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
index 976ad39f8e8..1c7795cadc6 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopExceptionTest.java
@@ -26,6 +26,7 @@ public class LoopExceptionTest extends ContextTestSupport {
     public void testLoopException() throws Exception {
         getMockEndpoint("mock:dead").expectedMessageCount(1);
         getMockEndpoint("mock:loop").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
 
         template.sendBody("direct:start", "Hello World");