You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/31 15:55:45 UTC

[camel] 01/07: CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0b185d12d9436acda7f2160594cd27e2c10cb3b1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jan 31 12:34:24 2021 +0100

    CAMEL-16103: Fixed multicast based EIPs (splitter, recipient list) in transacted mode will cause stackframe sizes to grown deep and lead to stack overflow error.
---
 .../interceptor/TransactedStackSizeTest.java       |  31 +++-
 .../apache/camel/processor/MulticastProcessor.java | 206 ++++++++++++++++-----
 .../ROOT/pages/camel-3x-upgrade-guide-3_8.adoc     |   9 +
 3 files changed, 194 insertions(+), 52 deletions(-)

diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
index 69e0206..9aa6b67 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactedStackSizeTest.java
@@ -17,31 +17,46 @@
 package org.apache.camel.spring.interceptor;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TransactedStackSizeTest extends TransactionClientDataSourceSupport {
 
-    private static final boolean PRINT_STACK_TRACE = true;
+    private int total = 100;
+    private static final boolean PRINT_STACK_TRACE = false;
 
     @Test
     public void testStackSize() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(10);
+        getMockEndpoint("mock:line").expectedMessageCount(total);
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        template.sendBody("seda:start", "A,B,C,D,E,F,G,H,I,J");
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < total; i++) {
+            sb.append(i);
+            sb.append(",");
+        }
+        template.sendBody("seda:start", "" + sb.toString());
 
         assertMockEndpointsSatisfied();
 
-        int[] sizes = new int[11];
-        for (int i = 0; i < 10; i++) {
+        int[] sizes = new int[total + 1];
+        for (int i = 0; i < total; i++) {
             int size = getMockEndpoint("mock:line").getReceivedExchanges().get(i).getMessage().getHeader("stackSize",
                     int.class);
             sizes[i] = size;
+            Assertions.assertTrue(size < 100, "Stackframe should be < 100");
             log.info("#{} size {}", i, size);
         }
         int size = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getMessage().getHeader("stackSize", int.class);
-        sizes[10] = size;
-        log.info("#{} size {}", 10, size);
+        sizes[total] = size;
+        log.info("#{} size {}", total, size);
+
+        int prev = sizes[0];
+        // last may be shorter, so use total - 1
+        for (int i = 1; i < total - 1; i++) {
+            size = sizes[i];
+            Assertions.assertEquals(prev, size, "Stackframe should be same size");
+        }
     }
 
     @Override
@@ -53,9 +68,11 @@ public class TransactedStackSizeTest extends TransactionClientDataSourceSupport
                     .transacted()
                     .split(body())
                         .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
+                        .log("${body} stack-size ${header.stackSize}")
                         .to("mock:line")
                     .end()
                     .setHeader("stackSize", TransactedStackSizeTest::currentStackSize)
+                    .log("${body} stack-size ${header.stackSize}")
                     .to("mock:result");
             }
         };
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index c8c4559..b87a21d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -282,7 +282,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
             return true;
         }
 
-        MulticastTask state = new MulticastTask(exchange, pairs, callback);
+        // we need to run in either transacted or reactive mode because the threading model is different
+        // when we run in transacted mode, then we synchronous processing on the current thread
+        // this can lead to a long execution which can lead to deep stackframes, and therefore we
+        // must handle this specially in a while loop structure to ensure the strackframe does not grow deeper
+        // the reactive mode will execute each sub task in its own runnable task which is scheduled on the reactive executor
+        // which is how the routing engine normally operates
+        AbstractMulticastTask state = exchange.isTransacted()
+                ? new MulticastTransactedTask(exchange, pairs, callback) : new MulticastTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
@@ -307,7 +314,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         }
     }
 
-    protected class MulticastTask implements Runnable {
+    protected abstract class AbstractMulticastTask implements Runnable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -321,7 +328,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
 
-        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+        AbstractMulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -339,6 +346,71 @@ public class MulticastProcessor extends AsyncProcessorSupport
             return "MulticastTask";
         }
 
+        protected void aggregate() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    Exchange exchange;
+                    while (!done.get() && (exchange = completion.poll()) != null) {
+                        doAggregate(result, exchange, original);
+                        if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
+                            doDone(result.get(), true);
+                        }
+                    }
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void timeout() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    while (nbAggregated.get() < nbExchangeSent.get()) {
+                        Exchange exchange = completion.pollUnordered();
+                        int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
+                        while (nbAggregated.get() < index) {
+                            AggregationStrategy strategy = getAggregationStrategy(null);
+                            strategy.timeout(result.get() != null ? result.get() : original,
+                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
+                        }
+                        if (exchange != null) {
+                            doAggregate(result, exchange, original);
+                            nbAggregated.incrementAndGet();
+                        }
+                    }
+                    doDone(result.get(), true);
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        protected void doDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            }
+        }
+    }
+
+    /**
+     * Sub taks processed reactive via the {@link ReactiveExecutor}.
+     */
+    protected class MulticastTask extends AbstractMulticastTask {
+
+        public MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
         @Override
         public void run() {
             try {
@@ -421,59 +493,103 @@ public class MulticastProcessor extends AsyncProcessorSupport
             }
         }
 
-        protected void aggregate() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
+    }
+
+    /**
+     * Transacted sub task processed synchronously using {@link Processor#process(Exchange)} with the same thread in a
+     * while loop control flow.
+     */
+    protected class MulticastTransactedTask extends AbstractMulticastTask {
+
+        public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
+        @Override
+        public void run() {
+            boolean next = true;
+            while (next) {
                 try {
-                    Exchange exchange;
-                    while (!done.get() && (exchange = completion.poll()) != null) {
-                        doAggregate(result, exchange, original);
-                        if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
-                            doDone(result.get(), true);
-                        }
-                    }
-                } catch (Throwable e) {
+                    next = doRun();
+                } catch (Exception e) {
                     original.setException(e);
-                    // and do the done work
                     doDone(null, false);
-                } finally {
-                    lock.unlock();
+                    return;
                 }
             }
         }
 
-        protected void timeout() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
-                try {
-                    while (nbAggregated.get() < nbExchangeSent.get()) {
-                        Exchange exchange = completion.pollUnordered();
-                        int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
-                        while (nbAggregated.get() < index) {
-                            AggregationStrategy strategy = getAggregationStrategy(null);
-                            strategy.timeout(result.get() != null ? result.get() : original,
-                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
-                        }
-                        if (exchange != null) {
-                            doAggregate(result, exchange, original);
-                            nbAggregated.incrementAndGet();
-                        }
-                    }
-                    doDone(result.get(), true);
-                } catch (Throwable e) {
-                    original.setException(e);
-                    // and do the done work
-                    doDone(null, false);
-                } finally {
-                    lock.unlock();
+        boolean doRun() throws Exception {
+            if (done.get()) {
+                return false;
+            }
+
+            // Check if the iterator is empty
+            // This can happen the very first time we check the existence
+            // of an item before queuing the run.
+            // or some iterators may return true for hasNext() but then null in next()
+            if (!iterator.hasNext()) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            ProcessorExchangePair pair = iterator.next();
+            boolean hasNext = iterator.hasNext();
+            // some iterators may return true for hasNext() but then null in next()
+            if (pair == null && !hasNext) {
+                doDone(result.get(), true);
+                return false;
+            }
+
+            Exchange exchange = pair.getExchange();
+            int index = nbExchangeSent.getAndIncrement();
+            updateNewExchange(exchange, index, pairs, hasNext);
+
+            // Schedule the processing of the next pair
+            if (hasNext) {
+                if (isParallelProcessing()) {
+                    schedule(this);
                 }
+            } else {
+                allSent.set(true);
             }
-        }
 
-        protected void doDone(Exchange exchange, boolean forceExhaust) {
-            if (done.compareAndSet(false, true)) {
-                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            // process next
+
+            // compute time taken if sending to another endpoint
+            StopWatch watch = beforeSend(pair);
+            Processor sync = pair.getProcessor();
+            try {
+                sync.process(exchange);
+            } finally {
+                afterSend(pair, watch);
+            }
+
+            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
+            // remember to test for stop on exception and aggregate before copying back results
+            boolean continueProcessing = PipelineHelper.continueProcessing(exchange,
+                    "Multicast processing failed for number " + index, LOG);
+            if (stopOnException && !continueProcessing) {
+                if (exchange.getException() != null) {
+                    // wrap in exception to explain where it failed
+                    exchange.setException(new CamelExchangeException(
+                            "Multicast processing failed for number " + index, exchange, exchange.getException()));
+                } else {
+                    // we want to stop on exception, and the exception was handled by the error handler
+                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
+                    // so we should set the failed exchange as the result and be done
+                    result.set(exchange);
+                }
+                // and do the done work
+                doDone(exchange, true);
+                return false;
             }
+
+            // aggregate exchanges if any
+            aggregate();
+
+            // next step
+            return true;
         }
     }
 
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
index 11c67f6..585bcf6 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_8.adoc
@@ -33,6 +33,15 @@ The method `isOnlyDynamicQueryParameters` is removed from `org.apache.camel.spi.
 
 The `onCompletion` EIP has fixed it could trigger multiple completions for a given `Exchange`
 
+=== Transactions and Multicast, Splitter, or Recipient List EIPs
+
+When using `transacted` in Camel routes with Multicast, Splitter, or Recipient List EIPs, then the exection strackframe
+may grown deep and could cause Stack overflow exception. This has been fixed by refactoring the EIP into a special
+transacted mode and the existing reactive mode.
+
+We do not anticipate any issues but if you are using transactions and these EIPs then we would like to have feedback
+if you encounter any problems with upgrading.
+
 === camel-jackson
 
 When using XML DSL then `jsonView` has been renamed to `jsonViewTypeName` and made general available in the model,