You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/31 16:14:49 UTC

[camel] 07/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f24071a331c0b2a50e57d1f5ef52e76fa5e408a6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 16:54:58 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../interceptor/TransactedStackSizeBreakOnExceptionTest.java   |  2 +-
 .../interceptor/TransactedStackSizeParallelProcessingTest.java | 10 ++++------
 .../camel/spring/interceptor/TransactedStackSizeTest.java      |  4 ++--
 .../java/org/apache/camel/processor/MulticastProcessor.java    |  3 ++-
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
index 1eb29f2..1a6b5b6 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
@@ -49,7 +49,7 @@ public class TransactedStackSizeBreakOnExceptionTest extends TransactionClientDa
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
 
         int prev = sizes[0];
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
index 739a9ed..729c78d 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
@@ -18,17 +18,15 @@ package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeParallelProcessingTest extends TransactionClientDataSourceSupport {
 
     private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
 
-    @Disabled("Flaky - May report 101 or 102 messages")
-    @RepeatedTest(value = 100)
+    // to test for flaky when using parallel processing then set this to 100
+    @RepeatedTest(value = 1)
     public void testStackSize() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:line").assertNoDuplicates(body());
@@ -49,11 +47,11 @@ public class TransactedStackSizeParallelProcessingTest extends TransactionClient
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 2f0bf71..984121b 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -46,11 +46,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f005dc9..948f514 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -288,7 +288,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper
         // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor
         // which is how the routing engine normally operates
-        MulticastTask state = exchange.isTransacted()
+        // if we have parallel processing enabled then we cannot run in transacted mode (requires synchronous processing via same thread)
+        MulticastTask state = !isParallelProcessing() && exchange.isTransacted()
                 ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastReactiveTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));