You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/31 16:14:42 UTC

[camel] branch master created (now a47a23b)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


      at a47a23b  Regen for commit a55d270c565c127126adc60d0ea9f6cffb06c7b9 (#4971)

This branch includes the following new commits:

     new 0b185d1  CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
     new fbffe84  CAMEL-16120: Upgrade resilience4j to 1.7
     new 75a09b5  Add ignite to git ignore
     new abd3732  CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
     new a60f7ce  Add git ignore for leveldb
     new f6a2fc5  CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
     new f24071a  CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
     new 3781799  CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
     new a47a23b  Regen for commit a55d270c565c127126adc60d0ea9f6cffb06c7b9 (#4971)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 02/09: CAMEL-16120: Upgrade resilience4j to 1.7

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fbffe84195e567bb9b0dba2315acf0b386891d4e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 13:37:03 2021 +0100

    CAMEL-16120: Upgrade resilience4j to 1.7
---
 parent/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index eba2b7e..08a9679 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -473,7 +473,7 @@
         <reactor-netty-version>1.0.2</reactor-netty-version>
         <redisson-version>3.14.0</redisson-version>
         <rescu-version>2.0.2</rescu-version>
-        <resilience4j-version>1.6.1</resilience4j-version>
+        <resilience4j-version>1.7.0</resilience4j-version>
         <rest-assured-version>4.3.3</rest-assured-version>
         <resteasy-version>4.5.6.Final</resteasy-version>
         <rhino-version>1.7.7.1</rhino-version>


[camel] 05/09: Add git ignore for leveldb

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a60f7cec9d4db619c5bec934c69c674574daa90c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 16:39:39 2021 +0100

    Add git ignore for leveldb
---
 components/camel-leveldb-legacy/.gitignore | 1 +
 components/camel-leveldb/.gitignore        | 1 +
 2 files changed, 2 insertions(+)

diff --git a/components/camel-leveldb-legacy/.gitignore b/components/camel-leveldb-legacy/.gitignore
new file mode 100644
index 0000000..c52b279
--- /dev/null
+++ b/components/camel-leveldb-legacy/.gitignore
@@ -0,0 +1 @@
+leveldb.dat
diff --git a/components/camel-leveldb/.gitignore b/components/camel-leveldb/.gitignore
new file mode 100644
index 0000000..c52b279
--- /dev/null
+++ b/components/camel-leveldb/.gitignore
@@ -0,0 +1 @@
+leveldb.dat


[camel] 07/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f24071a331c0b2a50e57d1f5ef52e76fa5e408a6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 16:54:58 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../interceptor/TransactedStackSizeBreakOnExceptionTest.java   |  2 +-
 .../interceptor/TransactedStackSizeParallelProcessingTest.java | 10 ++++------
 .../camel/spring/interceptor/TransactedStackSizeTest.java      |  4 ++--
 .../java/org/apache/camel/processor/MulticastProcessor.java    |  3 ++-
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
index 1eb29f2..1a6b5b6 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
@@ -49,7 +49,7 @@ public class TransactedStackSizeBreakOnExceptionTest extends TransactionClientDa
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
 
         int prev = sizes[0];
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
index 739a9ed..729c78d 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
@@ -18,17 +18,15 @@ package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeParallelProcessingTest extends TransactionClientDataSourceSupport {
 
     private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
 
-    @Disabled("Flaky - May report 101 or 102 messages")
-    @RepeatedTest(value = 100)
+    // to test for flaky when using parallel processing then set this to 100
+    @RepeatedTest(value = 1)
     public void testStackSize() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:line").assertNoDuplicates(body());
@@ -49,11 +47,11 @@ public class TransactedStackSizeParallelProcessingTest extends TransactionClient
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 2f0bf71..984121b 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -46,11 +46,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
-            log.info("#{} size {}", i, size);
+            log.debug("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
         sizes[total] = size;
-        log.info("#{} size {}", total, size);
+        log.debug("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f005dc9..948f514 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -288,7 +288,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper
         // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor
         // which is how the routing engine normally operates
-        MulticastTask state = exchange.isTransacted()
+        // if we have parallel processing enabled then we cannot run in transacted mode (requires synchronous processing via same thread)
+        MulticastTask state = !isParallelProcessing() && exchange.isTransacted()
                 ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastReactiveTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));


[camel] 08/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 378179991f67dfdfdc63877c1bbb070955897a00
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 17:08:24 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java     | 2 +-
 .../spring/interceptor/TransactedStackSizeParallelProcessingTest.java   | 2 +-
 .../org/apache/camel/spring/interceptor/TransactedStackSizeTest.java    | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
index 1a6b5b6..f367cc3 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
@@ -23,9 +23,9 @@ import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeBreakOnExceptionTest extends TransactionClientDataSourceSupport {
 
+    private static final boolean PRINT_STACK_TRACE = false;
     private int total = 100;
     private int failAt = 70;
-    private static final boolean PRINT_STACK_TRACE = false;
 
     @Test
     public void testStackSize() throws Exception {
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
index 729c78d..d229ea3 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
@@ -22,8 +22,8 @@ import org.junit.jupiter.api.RepeatedTest;
 
 public class TransactedStackSizeParallelProcessingTest extends TransactionClientDataSourceSupport {
 
-    private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
+    private int total = 100;
 
     // to test for flaky when using parallel processing then set this to 100
     @RepeatedTest(value = 1)
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 984121b..2ad0ffe 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -22,8 +22,8 @@ import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeTest extends TransactionClientDataSourceSupport {
 
-    private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
+    private int total = 100;
 
     @Test
     public void testStackSize() throws Exception {


[camel] 03/09: Add ignite to git ignore

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75a09b5021d50a80ffb846b3e043408cbce8dfa3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 15:49:11 2021 +0100

    Add ignite to git ignore
---
 components/camel-ignite/.gitignore | 1 +
 1 file changed, 1 insertion(+)

diff --git a/components/camel-ignite/.gitignore b/components/camel-ignite/.gitignore
new file mode 100644
index 0000000..25f33bd6
--- /dev/null
+++ b/components/camel-ignite/.gitignore
@@ -0,0 +1 @@
+ignite


[camel] 04/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit abd37329a5167332b86f5299a20676ff38b705e5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 16:38:51 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 ...> TransactedStackSizeBreakOnExceptionTest.java} | 36 +++++++++++++---------
 ...TransactedStackSizeParallelProcessingTest.java} | 20 +++++++-----
 .../interceptor/TransactedStackSizeTest.java       |  7 +++--
 3 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
similarity index 67%
copy from components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
copy to components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
index 9aa6b67..1eb29f2 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeBreakOnExceptionTest.java
@@ -16,45 +16,46 @@
  */
 package org.apache.camel.spring.interceptor;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TransactedStackSizeTest extends TransactionClientDataSourceSupport {
+public class TransactedStackSizeBreakOnExceptionTest extends TransactionClientDataSourceSupport {
 
     private int total = 100;
+    private int failAt = 70;
     private static final boolean PRINT_STACK_TRACE = false;
 
     @Test
     public void testStackSize() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(total);
-        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:line").expectedMessageCount(failAt);
+        getMockEndpoint("mock:line").assertNoDuplicates(body());
+        getMockEndpoint("mock:result").expectedMessageCount(0);
 
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < total; i++) {
             sb.append(i);
             sb.append(",");
         }
+
         template.sendBody("seda:start", "" + sb.toString());
 
         assertMockEndpointsSatisfied();
 
-        int[] sizes = new int[total + 1];
-        for (int i = 0; i < total; i++) {
+        int[] sizes = new int[failAt];
+        for (int i = 0; i < failAt; i++) {
             int size = getMockEndpoint("mock:line").getReceivedExchanges().get(i).getMessage().getHeader("stackSize",
                     int.class);
             sizes[i] = size;
             Assertions.assertTrue(size < 100, "Stackframe should be < 100");
             log.info("#{} size {}", i, size);
         }
-        int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
-        sizes[total] = size;
-        log.info("#{} size {}", total, size);
 
         int prev = sizes[0];
         // last may be shorter, so use total - 1
-        for (int i = 1; i < total - 1; i++) {
-            size = sizes[i];
+        for (int i = 1; i < failAt; i++) {
+            int size = sizes[i];
             Assertions.assertEquals(prev, size, "Stackframe should be same size");
         }
     }
@@ -66,13 +67,18 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
             public void configure() throws Exception {
                 from("seda:start")
                     .transacted()
-                    .split(body())
-                        .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                        .log("${body} stack-size ${header.stackSize}")
+                    .setHeader("stackSize", TransactedStackSizeBreakOnExceptionTest::currentStackSize)
+                    .log("BEGIN: ${body} stack-size ${header.stackSize}")
+                    .split(body()).stopOnException()
+                        .setHeader("stackSize", TransactedStackSizeBreakOnExceptionTest::currentStackSize)
+                        .log("LINE: ${body} stack-size ${header.stackSize}")
                         .to("mock:line")
+                        .filter(header(Exchange.SPLIT_INDEX).isEqualTo(failAt))
+                            .throwException(new IllegalStateException("Forced"))
+                        .end()
                     .end()
-                    .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                    .log("${body} stack-size ${header.stackSize}")
+                    .setHeader("stackSize", TransactedStackSizeBreakOnExceptionTest::currentStackSize)
+                    .log("RESULT: ${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
similarity index 78%
copy from components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
copy to components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
index 9aa6b67..739a9ed 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeParallelProcessingTest.java
@@ -18,16 +18,20 @@ package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
-public class TransactedStackSizeTest extends TransactionClientDataSourceSupport {
+public class TransactedStackSizeParallelProcessingTest extends TransactionClientDataSourceSupport {
 
     private int total = 100;
     private static final boolean PRINT_STACK_TRACE = false;
 
-    @Test
+    @Disabled("Flaky - May report 101 or 102 messages")
+    @RepeatedTest(value = 100)
     public void testStackSize() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(total);
+        getMockEndpoint("mock:line").assertNoDuplicates(body());
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
         StringBuilder sb = new StringBuilder();
@@ -66,13 +70,15 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
             public void configure() throws Exception {
                 from("seda:start")
                     .transacted()
-                    .split(body())
-                        .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                        .log("${body} stack-size ${header.stackSize}")
+                    .setHeader("stackSize", TransactedStackSizeParallelProcessingTest::currentStackSize)
+                    .log("BEGIN: ${body} stack-size ${header.stackSize}")
+                    .split(body()).parallelProcessing()
+                        .setHeader("stackSize", TransactedStackSizeParallelProcessingTest::currentStackSize)
+                        .log("LINE: ${body} stack-size ${header.stackSize}")
                         .to("mock:line")
                     .end()
-                    .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                    .log("${body} stack-size ${header.stackSize}")
+                    .setHeader("stackSize", TransactedStackSizeParallelProcessingTest::currentStackSize)
+                    .log("RESULT: ${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 9aa6b67..2f0bf71 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -28,6 +28,7 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
     @Test
     public void testStackSize() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(total);
+        getMockEndpoint("mock:line").assertNoDuplicates(body());
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
         StringBuilder sb = new StringBuilder();
@@ -66,13 +67,15 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
             public void configure() throws Exception {
                 from("seda:start")
                     .transacted()
+                    .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
+                    .log("BEGIN: ${body} stack-size ${header.stackSize}")
                     .split(body())
                         .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                        .log("${body} stack-size ${header.stackSize}")
+                        .log("LINE: ${body} stack-size ${header.stackSize}")
                         .to("mock:line")
                     .end()
                     .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
-                    .log("${body} stack-size ${header.stackSize}")
+                    .log("RESULT: ${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };


[camel] 01/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0b185d12d9436acda7f2160594cd27e2c10cb3b1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 12:34:24 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../interceptor/TransactedStackSizeTest.java       |  31 +++-
 .../apache/camel/processor/MulticastProcessor.java | 206 ++++++++++++++++-----
 .../ROOT/pages/camel-3x-upgrade-guide-3_8.adoc     |   9 +
 3 files changed, 194 insertions(+), 52 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 69e0206..9aa6b67 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -17,31 +17,46 @@
 package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeTest extends TransactionClientDataSourceSupport {
 
-    private static final boolean PRINT_STACK_TRACE = true;
+    private int total = 100;
+    private static final boolean PRINT_STACK_TRACE = false;
 
     @Test
     public void testStackSize() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(10);
+        getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        template.sendBody("seda:start", "A,B,C,D,E,F,G,H,I,J");
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < total; i++) {
+            sb.append(i);
+            sb.append(",");
+        }
+        template.sendBody("seda:start", "" + sb.toString());
 
         assertMockEndpointsSatisfied();
 
-        int[] sizes = new int[11];
-        for (int i = 0; i < 10; i++) {
+        int[] sizes = new int[total + 1];
+        for (int i = 0; i < total; i++) {
             int size = getMockEndpoint("mock:line").getReceivedExchanges().get(i).getMessage().getHeader("stackSize",
                     int.class);
             sizes[i] = size;
+            Assertions.assertTrue(size < 100, "Stackframe should be < 100");
             log.info("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
-        sizes[10] = size;
-        log.info("#{} size {}", 10, size);
+        sizes[total] = size;
+        log.info("#{} size {}", total, size);
+
+        int prev = sizes[0];
+        // last may be shorter, so use total - 1
+        for (int i = 1; i < total - 1; i++) {
+            size = sizes[i];
+            Assertions.assertEquals(prev, size, "Stackframe should be same size");
+        }
     }
 
     @Override
@@ -53,9 +68,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
                     .transacted()
                     .split(body())
                         .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
+                        .log("${body} stack-size ${header.stackSize}")
                         .to("mock:line")
                     .end()
                     .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
+                    .log("${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index c8c4559..b87a21d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -282,7 +282,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
             return true;
         }
 
-        MulticastTask state = new MulticastTask(exchange, pairs, callback);
+        // we need to run in either transacted or reactive mode because the threading model is different
+        // when we run in transacted mode, then we synchronous processing on the current thread
+        // this can lead to a long execution which can lead to deep stackframes, and therefore we
+        // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper
+        // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor
+        // which is how the routing engine normally operates
+        AbstractMulticastTask state = exchange.isTransacted()
+                ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
@@ -307,7 +314,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         }
     }
 
-    protected class MulticastTask implements Runnable {
+    protected abstract class AbstractMulticastTask implements Runnable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -321,7 +328,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
 
-        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+        AbstractMulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -339,6 +346,71 @@ public class MulticastProcessor extends AsyncProcessorSupport
             return "MulticastTask";
         }
 
+        protected void aggregate() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    Exchange exchange;
+                    while (!done.get() && (exchange = completion.poll()) != null) {
+                        doAggregate(result, exchange, original);
+                        if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
+                            doDone(result.get(), true);
+                        }
+                    }
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void timeout() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    while (nbAggregated.get() < nbExchangeSent.get()) {
+                        Exchange exchange = completion.pollUnordered();
+                        int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
+                        while (nbAggregated.get() < index) {
+                            AggregationStrategy strategy = getAggregationStrategy(null);
+                            strategy.timeout(result.get() != null ? result.get() : original,
+                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
+                        }
+                        if (exchange != null) {
+                            doAggregate(result, exchange, original);
+                            nbAggregated.incrementAndGet();
+                        }
+                    }
+                    doDone(result.get(), true);
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void doDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            }
+        }
+    }
+
+    /**
+     * Sub taks processed reactive via the {@link ReactiveExecutor}.
+     */
+    protected class MulticastTask extends AbstractMulticastTask {
+
+        public MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
         @Override
         public void run() {
             try {
@@ -421,59 +493,103 @@ public class MulticastProcessor extends AsyncProcessorSupport
             }
         }
 
-        protected void aggregate() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
+    }
+
+    /**
+     * Transacted sub task processed synchronously using {@link Processor#process(Exchange)} with the same thread in a
+     * while loop control flow.
+     */
+    protected class MulticastTransactedTask extends AbstractMulticastTask {
+
+        public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
+        @Override
+        public void run() {
+            boolean next = true;
+            while (next) {
                 try {
-                    Exchange exchange;
-                    while (!done.get() && (exchange = completion.poll()) != null) {
-                        doAggregate(result, exchange, original);
-                        if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
-                            doDone(result.get(), true);
-                        }
-                    }
-                } catch (Throwable e) {
+                    next = doRun();
+                } catch (Exception e) {
                     original.setException(e);
-                    // and do the done work
                     doDone(null, false);
-                } finally {
-                    lock.unlock();
+                    return;
                 }
             }
         }
 
-        protected void timeout() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
-                try {
-                    while (nbAggregated.get() < nbExchangeSent.get()) {
-                        Exchange exchange = completion.pollUnordered();
-                        int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
-                        while (nbAggregated.get() < index) {
-                            AggregationStrategy strategy = getAggregationStrategy(null);
-                            strategy.timeout(result.get() != null ? result.get() : original,
-                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
-                        }
-                        if (exchange != null) {
-                            doAggregate(result, exchange, original);
-                            nbAggregated.incrementAndGet();
-                        }
-                    }
-                    doDone(result.get(), true);
-                } catch (Throwable e) {
-                    original.setException(e);
-                    // and do the done work
-                    doDone(null, false);
-                } finally {
-                    lock.unlock();
+        boolean doRun() throws Exception {
+            if (done.get()) {
+                return false;
+            }
+
+            // Check if the iterator is empty
+            // This can happen the very first time we check the existence
+            // of an item before queuing the run.
+            // or some iterators may return true for hasNext() but then null in next()
+            if (!iterator.hasNext()) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            ProcessorExchangePair pair = iterator.next();
+            boolean hasNext = iterator.hasNext();
+            // some iterators may return true for hasNext() but then null in next()
+            if (pair == null && !hasNext) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            Exchange exchange = pair.getExchange();
+            int index = nbExchangeSent.getAndIncrement();
+            updateNewExchange(exchange, index, pairs, hasNext);
+
+            // Schedule the processing of the next pair
+            if (hasNext) {
+                if (isParallelProcessing()) {
+                    schedule(this);
                 }
+            } else {
+                allSent.set(true);
             }
-        }
 
-        protected void doDone(Exchange exchange, boolean forceExhaust) {
-            if (done.compareAndSet(false, true)) {
-                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            // process next
+
+            // compute time taken if sending to another endpoint
+            StopWatch watch = beforeSend(pair);
+            Processor sync = pair.getProcessor();
+            try {
+                sync.process(exchange);
+            } finally {
+                afterSend(pair, watch);
+            }
+
+            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
+            // remember to test for stop on exception and aggregate before copying back results
+            boolean continueProcessing = PipelineHelper.continueProcessing(exchange,
+                    "Multicast processing failed for number " + index, LOG);
+            if (stopOnException && !continueProcessing) {
+                if (exchange.getException() != null) {
+                    // wrap in exception to explain where it failed
+                    exchange.setException(new CamelExchangeException(
+                            "Multicast processing failed for number " + index, exchange, exchange.getException()));
+                } else {
+                    // we want to stop on exception, and the exception was handled by the error handler
+                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
+                    // so we should set the failed exchange as the result and be done
+                    result.set(exchange);
+                }
+                // and do the done work
+                doDone(exchange, true);
+                return false;
             }
+
+            // aggregate exchanges if any
+            aggregate();
+
+            // next step
+            return true;
         }
     }
 
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
index 11c67f6..585bcf6 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
@@ -33,6 +33,15 @@ The method `isOnlyDynamicQueryParameters` is removed from `org.apache.camel.spi.
 
 The `onCompletion` EIP has fixed it could trigger multiple completions for a given `Exchange`
 
+=== Transactions and Multicast, Splitter, or Recipient List EIPs
+
+When using `transacted` in Camel routes with Multicast, Splitter, or Recipient List EIPs, then the exection strackframe
+may grown deep and could cause Stack overflow exception. This has been fixed by refactoring the EIP into a special
+transacted mode and the existing reactive mode.
+
+We do not anticipate any issues but if you are using transactions and these EIPs then we would like to have feedback
+if you encounter any problems with upgrading.
+
 === camel-jackson
 
 When using XML DSL then `jsonView` has been renamed to `jsonViewTypeName` and made general available in the model,


[camel] 06/09: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f6a2fc55d120cc23087921af0f4b1e522c7a348c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 16:41:33 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../org/apache/camel/processor/MulticastProcessor.java     | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b87a21d..f005dc9 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -288,8 +288,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper
         // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor
         // which is how the routing engine normally operates
-        AbstractMulticastTask state = exchange.isTransacted()
-                ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastTask(exchange, pairs, callback);
+        MulticastTask state = exchange.isTransacted()
+                ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastReactiveTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
@@ -314,7 +314,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         }
     }
 
-    protected abstract class AbstractMulticastTask implements Runnable {
+    protected abstract class MulticastTask implements Runnable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -328,7 +328,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
 
-        AbstractMulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -405,9 +405,9 @@ public class MulticastProcessor extends AsyncProcessorSupport
     /**
      * Sub taks processed reactive via the {@link ReactiveExecutor}.
      */
-    protected class MulticastTask extends AbstractMulticastTask {
+    protected class MulticastReactiveTask extends MulticastTask {
 
-        public MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+        public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);
         }
 
@@ -499,7 +499,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
      * Transacted sub task processed synchronously using {@link Processor#process(Exchange)} with the same thread in a
      * while loop control flow.
      */
-    protected class MulticastTransactedTask extends AbstractMulticastTask {
+    protected class MulticastTransactedTask extends MulticastTask {
 
         public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);


[camel] 09/09: Regen for commit a55d270c565c127126adc60d0ea9f6cffb06c7b9 (#4971)

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a47a23bf59f05303dbe05a22fbe93be9d281e1b6
Author: github-actions[bot] <41...@users.noreply.github.com>
AuthorDate: Sun Jan 31 17:09:08 2021 +0100

    Regen for commit a55d270c565c127126adc60d0ea9f6cffb06c7b9 (#4971)
    
    Signed-off-by: GitHub <no...@github.com>
    
    Co-authored-by: oscerd <os...@users.noreply.github.com>
---
 .../resources/org/apache/camel/catalog/schemas/camel-spring.xsd       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 070e9a6..d7cdfbd 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -6651,7 +6651,7 @@ Class name of the java type to use when unmarshalling.
             ]]></xs:documentation>
           </xs:annotation>
         </xs:attribute>
-        <xs:attribute name="jsonView" type="xs:string">
+        <xs:attribute name="jsonViewTypeName" type="xs:string">
           <xs:annotation>
             <xs:documentation xml:lang="en"><![CDATA[
 When marshalling a POJO to JSON you might want to exclude certain fields from
@@ -6974,7 +6974,7 @@ Class name of the java type to use when unmarshalling.
             ]]></xs:documentation>
           </xs:annotation>
         </xs:attribute>
-        <xs:attribute name="jsonView" type="xs:string">
+        <xs:attribute name="jsonViewTypeName" type="xs:string">
           <xs:annotation>
             <xs:documentation xml:lang="en"><![CDATA[
 When marshalling a POJO to JSON you might want to exclude certain fields from