You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:20 UTC
[camel] 03/11: Full asynchronous engine with low stack depth
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit d17a41fb6182e2991c31cb256bc093bace6b5e26
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Sat Oct 20 00:48:12 2018 +0200
Full asynchronous engine with low stack depth
---
.../camel/spi/AsyncProcessorAwaitManager.java | 19 +-
camel-core/pom.xml | 5 +
camel-core/src/main/docs/eips/delay-eip.adoc | 2 +-
.../apache/camel/component/file/FileConsumer.java | 23 +-
.../apache/camel/component/seda/SedaProducer.java | 1 -
.../camel/component/timer/TimerConsumer.java | 5 +
.../impl/DefaultAsyncProcessorAwaitManager.java | 32 +-
.../apache/camel/impl/DefaultProducerCache.java | 12 +-
.../org/apache/camel/model/DelayDefinition.java | 12 +-
.../camel/processor/CamelInternalProcessor.java | 103 +-
.../camel/processor/ConvertBodyProcessor.java | 28 +-
.../camel/processor/DelayProcessorSupport.java | 23 +-
.../camel/processor/DelegateAsyncProcessor.java | 17 +-
.../camel/processor/DelegateSyncProcessor.java | 4 +-
.../org/apache/camel/processor/LoopProcessor.java | 176 +-
.../apache/camel/processor/MulticastProcessor.java | 784 +++------
.../java/org/apache/camel/processor/Pipeline.java | 165 +-
.../camel/processor/RedeliveryErrorHandler.java | 1757 +++++++++-----------
.../org/apache/camel/processor/Resequencer.java | 21 +-
.../org/apache/camel/processor/RoutingSlip.java | 4 +-
.../org/apache/camel/processor/SendProcessor.java | 10 +-
.../processor/SharedCamelInternalProcessor.java | 50 +-
.../java/org/apache/camel/processor/Splitter.java | 14 +-
.../apache/camel/processor/ThroughputLogger.java | 11 +-
.../org/apache/camel/processor/TryProcessor.java | 90 +-
.../processor/interceptor/DefaultChannel.java | 13 -
.../org/apache/camel/reifier/DelayReifier.java | 2 +-
.../apache/camel/support/AsyncProcessorHelper.java | 18 +-
.../org/apache/camel/support/ReactiveHelper.java | 156 ++
.../component/seda/SedaBlockWhenFullTest.java | 4 +-
.../seda/SedaDefaultBlockWhenFullTest.java | 4 +-
.../camel/impl/MultipleConsumersSupportTest.java | 8 +
...ngExchangesAsyncDelayShutdownGracefulTest.java} | 31 +-
.../impl/PendingExchangesShutdownGracefulTest.java | 2 +-
...ndingExchangesTwoRouteShutdownGracefulTest.java | 4 +-
.../EventNotifierFailureHandledEventsTest.java | 8 +-
.../CharlesSplitAndTryCatchRollbackIssueTest.java | 4 +-
...thAggregationStrategyThrowingExceptionTest.java | 11 +-
...terParallelRuntimeExceptionInHasNextOrNext.java | 12 -
...ThreadsRejectedExecutionWithDeadLetterTest.java | 4 +-
.../apache/camel/processor/LoopAsyncCopyTest.java | 3 -
.../camel/processor/LoopAsyncNoCopyTest.java | 3 -
.../MulticastParallelStopOnExceptionTest.java | 3 +-
.../processor/MulticastParallelStreamingTest.java | 2 +-
.../processor/MulticastStopOnExceptionTest.java | 2 +-
.../apache/camel/processor/NavigateRouteTest.java | 22 +-
...pientListContextScopedOnExceptionIssueTest.java | 7 +
.../processor/ShutdownCompleteAllTasksTest.java | 2 +-
.../ShutdownCompleteCurrentTaskOnlyTest.java | 4 +-
.../SplitterParallelStopOnExceptionTest.java | 2 +-
.../processor/SplitterStopOnExceptionTest.java | 2 +-
.../processor/ThreadsRejectedExecutionTest.java | 2 +-
.../camel/processor/UnmarshalProcessorTest.java | 2 +
.../async/AsyncEndpointCustomInterceptorTest.java | 9 +-
.../AsyncEndpointRecipientListParallel5Test.java | 3 +-
.../AsyncEndpointRecipientListParallelTest.java | 3 +-
.../routingslip/RoutingSlipEventNotifierTest.java | 1 +
.../java/org/apache/camel/util/FilterIterator.java | 88 +
.../main/java/org/apache/camel/util/IOHelper.java | 10 +-
.../util/concurrent/AsyncCompletionService.java | 167 ++
.../concurrent/AsyncCompletionServiceTest.java | 196 +++
.../processor/ShutdownCompleteAllTasksTest.xml | 2 +-
.../ShutdownCompleteCurrentTaskOnlyTest.xml | 2 +-
63 files changed, 2050 insertions(+), 2136 deletions(-)
diff --git a/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index d4f8bdd..dfcfdce 100644
--- a/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -19,6 +19,7 @@ package org.apache.camel.spi;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.StaticService;
@@ -122,22 +123,12 @@ public interface AsyncProcessorAwaitManager extends StaticService {
}
/**
- * Registers the exchange to await for the callback to be triggered by another thread which has taken over processing
- * this exchange. The current thread will await until that callback happens in the future (blocking until this happens).
+ * Process the given exchange sychronously.
*
- * @param exchange the exchange
- * @param latch the latch used to wait for other thread to signal when its done
+ * @param processor the async processor to call
+ * @param exchange the exchange to process
*/
- void await(Exchange exchange, CountDownLatch latch);
-
- /**
- * Triggered when the other thread is done processing the exchange, to signal to the waiting thread is done, and can take
- * over control to further process the exchange.
- *
- * @param exchange the exchange
- * @param latch the latch used to wait for other thread to signal when its done
- */
- void countDown(Exchange exchange, CountDownLatch latch);
+ void process(final AsyncProcessor processor, final Exchange exchange);
/**
* Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing
diff --git a/camel-core/pom.xml b/camel-core/pom.xml
index 51c6e83..6b08aaf 100644
--- a/camel-core/pom.xml
+++ b/camel-core/pom.xml
@@ -192,6 +192,11 @@
<!-- testing -->
<dependency>
+ <groupId>org.codehaus.woodstox</groupId>
+ <artifactId>woodstox-core-asl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/camel-core/src/main/docs/eips/delay-eip.adoc b/camel-core/src/main/docs/eips/delay-eip.adoc
index 00684a0..b1a3f4b 100644
--- a/camel-core/src/main/docs/eips/delay-eip.adoc
+++ b/camel-core/src/main/docs/eips/delay-eip.adoc
@@ -11,7 +11,7 @@ The Delay EIP supports 3 options which are listed below:
|===
| Name | Description | Default | Type
| *executorServiceRef* | Refers to a custom Thread Pool if asyncDelay has been enabled. | | String
-| *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
+| *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | true | Boolean
| *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
|===
// eip options: END
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 65aebda..446b6ef 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -48,11 +49,8 @@ public class FileConsumer extends GenericFileConsumer<File> {
this.endpointPath = endpoint.getConfiguration().getDirectory();
if (endpoint.getExtendedAttributes() != null) {
- this.extendedAttributes = new HashSet<>();
-
- for (String attribute : endpoint.getExtendedAttributes().split(",")) {
- extendedAttributes.add(attribute);
- }
+ List<String> attributes = Arrays.asList(endpoint.getExtendedAttributes().split(","));
+ this.extendedAttributes = new HashSet<>(attributes);
}
}
@@ -87,7 +85,7 @@ public class FileConsumer extends GenericFileConsumer<File> {
}
List<File> files = Arrays.asList(dirFiles);
if (getEndpoint().isPreSort()) {
- Collections.sort(files, (a, b) -> a.getAbsoluteFile().compareTo(a.getAbsoluteFile()));
+ files.sort(Comparator.comparing(File::getAbsoluteFile));
}
for (File file : dirFiles) {
@@ -99,7 +97,7 @@ public class FileConsumer extends GenericFileConsumer<File> {
// trace log as Windows/Unix can have different views what the file is?
if (log.isTraceEnabled()) {
log.trace("Found file: {} [isAbsolute: {}, isDirectory: {}, isFile: {}, isHidden: {}]",
- new Object[]{file, file.isAbsolute(), file.isDirectory(), file.isFile(), file.isHidden()});
+ file, file.isAbsolute(), file.isDirectory(), file.isFile(), file.isHidden());
}
// creates a generic file
@@ -205,13 +203,12 @@ public class FileConsumer extends GenericFileConsumer<File> {
// compute the file path as relative to the starting directory
File path;
- String endpointNormalized = FileUtil.normalizePath(endpointPath);
- if (file.getPath().startsWith(endpointNormalized + File.separator)) {
- // skip duplicate endpoint path
- path = new File(StringHelper.after(file.getPath(), endpointNormalized + File.separator));
- } else {
- path = new File(file.getPath());
+ String endpointNormalizedSep = FileUtil.normalizePath(endpointPath) + File.separator;
+ String p = file.getPath();
+ if (p.startsWith(endpointNormalizedSep)) {
+ p = p.substring(endpointNormalizedSep.length());
}
+ path = new File(p);
if (path.getParent() != null) {
answer.setRelativeFilePath(path.getParent() + File.separator + file.getName());
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index 38a059b..2bc5b0c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -99,7 +99,6 @@ public class SedaProducer extends DefaultAsyncProducer {
}
});
- log.trace("Adding Exchange to queue: {}", copy);
try {
// do not copy as we already did the copy
addToQueue(copy, false);
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index eab3bb7..1c81cb0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.StartupListener;
import org.apache.camel.Suspendable;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.ReactiveHelper;
/**
* The timer consumer.
@@ -174,6 +175,10 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S
}
protected void sendTimerExchange(long counter) {
+ ReactiveHelper.schedule(() -> doSendTimerExchange(counter), "Send timer exchange");
+ }
+
+ protected void doSendTimerExchange(long counter) {
final Exchange exchange = endpoint.createExchange();
exchange.setProperty(Exchange.TIMER_COUNTER, counter);
exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index d78e852..774ccd7 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -23,8 +23,11 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.MessageHistory;
import org.apache.camel.NamedNode;
@@ -33,6 +36,7 @@ import org.apache.camel.processor.DefaultExchangeFormatter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceSupport;
public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
@@ -59,8 +63,33 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
this.exchangeFormatter = formatter;
}
- @Override
+ /**
+ * Calls the async version of the processor's process method and waits
+ * for it to complete before returning. This can be used by {@link AsyncProcessor}
+ * objects to implement their sync version of the process method.
+ * <p/>
+ * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous
+ * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible.
+ *
+ * @param processor the processor
+ * @param exchange the exchange
+ * @throws Exception can be thrown if waiting is interrupted
+ */
+ public void process(final AsyncProcessor processor, final Exchange exchange) {
+ CountDownLatch latch = new CountDownLatch(1);
+ processor.process(exchange, doneSync -> countDown(exchange, latch));
+ if (latch.getCount() > 0) {
+ await(exchange, latch);
+ }
+ }
+
public void await(Exchange exchange, CountDownLatch latch) {
+ // Early exit for pending reactive queued work
+ do {
+ if (latch.getCount() <= 0) {
+ return;
+ }
+ } while (ReactiveHelper.executeFromQueue());
log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
try {
@@ -98,7 +127,6 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
}
}
- @Override
public void countDown(Exchange exchange, CountDownLatch latch) {
log.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
latch.countDown();
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
index 5638f9d..454215e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
@@ -298,17 +298,19 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
}
try {
- StopWatch sw = null;
+ // record timing for sending the exchange using the producer
+ StopWatch watch;
if (eventNotifierEnabled && exchange != null) {
boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
if (sending) {
- sw = new StopWatch();
+ watch = new StopWatch();
+ } else {
+ watch = null;
}
+ } else {
+ watch = null;
}
- // record timing for sending the exchange using the producer
- final StopWatch watch = sw;
-
// invoke the callback
return producerCallback.doInAsyncProducer(producer, exchange, doneSync -> {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
index 40cebdc..8d6c7f5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
@@ -41,7 +41,7 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
private ExecutorService executorService;
@XmlAttribute
private String executorServiceRef;
- @XmlAttribute @Metadata(defaultValue = "false")
+ @XmlAttribute @Metadata(defaultValue = "true")
private Boolean asyncDelayed;
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean callerRunsWhenRejected;
@@ -68,7 +68,7 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
return "Delay[" + getExpression() + " -> " + getOutputs() + "]";
}
- // Fluent API
+ // Fluent API
// -------------------------------------------------------------------------
/**
@@ -104,6 +104,14 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
}
/**
+ * Enables asynchronous delay which means the thread will <b>not</b> block while delaying.
+ */
+ public DelayDefinition syncDelayed() {
+ setAsyncDelayed(false);
+ return this;
+ }
+
+ /**
* To use a custom Thread Pool if asyncDelay has been enabled.
*/
public DelayDefinition executorService(ExecutorService executorService) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index d2e40ca..ce03628 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -46,6 +46,7 @@ import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
@@ -117,7 +118,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
}
@Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ public boolean process(Exchange exchange, AsyncCallback ocallback) {
// ----------------------------------------------------------
// CAMEL END USER - READ ME FOR DEBUGGING TIPS
// ----------------------------------------------------------
@@ -134,7 +135,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
if (processor == null || !continueProcessing(exchange)) {
// no processor or we should not continue then we are done
- callback.done(true);
+ ocallback.done(true);
return true;
}
@@ -148,17 +149,37 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
states[i] = state;
} catch (Throwable e) {
exchange.setException(e);
- callback.done(true);
+ ocallback.done(true);
return true;
}
}
// create internal callback which will execute the advices in reverse order when done
- callback = new InternalCallback(states, exchange, callback);
+ AsyncCallback callback = doneSync -> {
+ try {
+ for (int i = advices.size() - 1; i >= 0; i--) {
+ CamelInternalProcessorAdvice task = advices.get(i);
+ Object state = states[i];
+ try {
+ task.after(exchange, state);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ // allow all advices to complete even if there was an exception
+ }
+ }
+ } finally {
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ START +++
+ // ----------------------------------------------------------
+ // callback must be called
+ ReactiveHelper.callback(ocallback);
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ END +++
+ // ----------------------------------------------------------
+ }
+ };
- // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
- Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
- if (exchange.isTransacted() || synchronous != null) {
+ if (exchange.isTransacted()) {
// must be synchronized for transacted exchanges
if (log.isTraceEnabled()) {
if (exchange.isTransacted()) {
@@ -196,21 +217,23 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
if (log.isTraceEnabled()) {
log.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
}
- boolean sync = processor.process(exchange, async);
+ processor.process(exchange, async);
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- // execute any after processor work (in current thread, not in the callback)
- if (uow != null) {
- uow.afterProcess(processor, exchange, callback, sync);
- }
+ ReactiveHelper.scheduleLast(() -> {
+ // execute any after processor work (in current thread, not in the callback)
+ if (uow != null) {
+ uow.afterProcess(processor, exchange, callback, false);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
- new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
- }
- return sync;
+ if (log.isTraceEnabled()) {
+ log.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ }
+ }, "CamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
+ return false;
}
}
@@ -220,52 +243,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
}
/**
- * Internal callback that executes the after advices.
- */
- private final class InternalCallback implements AsyncCallback {
-
- private final Object[] states;
- private final Exchange exchange;
- private final AsyncCallback callback;
-
- private InternalCallback(Object[] states, Exchange exchange, AsyncCallback callback) {
- this.states = states;
- this.exchange = exchange;
- this.callback = callback;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void done(boolean doneSync) {
- // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only
- // so you can step straight to the finally block and invoke the callback
-
- // we should call after in reverse order
- try {
- for (int i = advices.size() - 1; i >= 0; i--) {
- CamelInternalProcessorAdvice task = advices.get(i);
- Object state = states[i];
- try {
- task.after(exchange, state);
- } catch (Throwable e) {
- exchange.setException(e);
- // allow all advices to complete even if there was an exception
- }
- }
- } finally {
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ START +++
- // ----------------------------------------------------------
- // callback must be called
- callback.done(doneSync);
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ END +++
- // ----------------------------------------------------------
- }
- }
- }
-
- /**
* Strategy to determine if we should continue processing the {@link Exchange}.
*/
protected boolean continueProcessing(Exchange exchange) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index bd1b350..70b717d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -64,25 +64,18 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
boolean out = exchange.hasOut();
Message old = out ? exchange.getOut() : exchange.getIn();
if (old.getBody() == null) {
// only convert if there is a body
- callback.done(true);
- return true;
+ return;
}
if (exchange.getException() != null) {
// do not convert if an exception has been thrown as if we attempt to convert and it also fails with a new
// exception then it will override the existing exception
- callback.done(true);
- return true;
+ return;
}
String originalCharsetName = null;
@@ -93,14 +86,7 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
exchange.setProperty(Exchange.CHARSET_NAME, charset);
}
// use mandatory conversion
- Object value;
- try {
- value = old.getMandatoryBody(type);
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
+ Object value = old.getMandatoryBody(type);
// create a new message container so we do not drag specialized message objects along
// but that is only needed if the old message is a specialized message
@@ -126,7 +112,15 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
exchange.removeProperty(Exchange.CHARSET_NAME);
}
}
+ }
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
callback.done(true);
return true;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index 5202fce..1bba746 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -25,6 +25,8 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +37,12 @@ import org.slf4j.LoggerFactory;
* <p/>
* This implementation will block while waiting.
*/
-public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
+public abstract class DelayProcessorSupport extends DelegateAsyncProcessor implements ShutdownAware {
private final CamelContext camelContext;
private final ScheduledExecutorService executorService;
private final boolean shutdownExecutorService;
- private boolean asyncDelayed;
+ private boolean asyncDelayed = true;
private boolean callerRunsWhenRejected = true;
private final AtomicInteger delayedCount = new AtomicInteger(0);
@@ -247,6 +249,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
protected void doStart() throws Exception {
if (isAsyncDelayed()) {
ObjectHelper.notNull(executorService, "executorService", this);
+ } else if (executorService != null) {
+ asyncDelayed = true;
}
super.doStart();
}
@@ -258,4 +262,19 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
}
super.doShutdown();
}
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return getDelayedCount();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
index 29594d4..8f3d0f4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -86,22 +85,10 @@ public class DelegateAsyncProcessor extends ServiceSupport implements DelegatePr
ServiceHelper.stopAndShutdownServices(processor);
}
+ @Override
public void process(Exchange exchange) throws Exception {
- // inline org.apache.camel.support.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
- // to optimize and reduce stacktrace lengths
final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
- final CountDownLatch latch = new CountDownLatch(1);
- // call the asynchronous method and wait for it to be done
- boolean sync = process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- awaitManager.countDown(exchange, latch);
- }
- }
- });
- if (!sync) {
- awaitManager.await(exchange, latch);
- }
+ awaitManager.process(this, exchange);
}
public boolean process(final Exchange exchange, final AsyncCallback callback) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
index 653a736..963eec0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
@@ -64,9 +64,9 @@ public class DelegateSyncProcessor extends ServiceSupport implements org.apache.
exchange.setException(e);
} finally {
// we are bridging a sync processor as async so callback with true
- callback.done(true);
+ callback.done(false);
}
- return true;
+ return false;
}
@Override
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 6dc8888..92614a0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -16,17 +16,16 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -49,133 +48,84 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- // use atomic integer to be able to pass reference and keep track on the values
- AtomicInteger index = new AtomicInteger();
- AtomicInteger count = new AtomicInteger();
- AtomicBoolean doWhile = new AtomicBoolean();
-
try {
- if (expression != null) {
- // Intermediate conversion to String is needed when direct conversion to Integer is not available
- // but evaluation result is a textual representation of a numeric value.
- String text = expression.evaluate(exchange, String.class);
- int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
- count.set(num);
- } else {
- boolean result = predicate.matches(exchange);
- doWhile.set(result);
- }
+
+ LoopState state = new LoopState(exchange, callback);
+
+ ReactiveHelper.scheduleMain(state);
+ return false;
+
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
return true;
}
+ }
- // we hold on to the original Exchange in case it's needed for copies
- final Exchange original = exchange;
-
- // per-iteration exchange
- Exchange target = exchange;
-
- // set the size before we start
- if (predicate == null) {
- exchange.setProperty(Exchange.LOOP_SIZE, count);
- }
+ /**
+ * Class holding state for loop processing
+ */
+ class LoopState implements Runnable {
- // loop synchronously
- while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
-
- // and prepare for next iteration
- // if (!copy) target = exchange; else copy of original
- target = prepareExchange(exchange, index.get(), original);
- // the following process method will in the done method re-evaluate the predicate
- // so we do not need to do it here as well
- boolean sync = process(target, callback, index, count, doWhile, original);
-
- if (!sync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
- // the remainder of the loop will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
- }
+ final Exchange exchange;
+ final AsyncCallback callback;
+ Exchange current;
+ int index;
+ int count;
- log.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
+ public LoopState(Exchange exchange, AsyncCallback callback) throws NoTypeConversionAvailableException {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.current = exchange;
- // check for error if so we should break out
- if (!continueProcessing(target, "so breaking out of loop", log)) {
- break;
+ // evaluate expression / predicate
+ if (expression != null) {
+ // Intermediate conversion to String is needed when direct conversion to Integer is not available
+ // but evaluation result is a textual representation of a numeric value.
+ String text = expression.evaluate(exchange, String.class);
+ count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+ exchange.setProperty(Exchange.LOOP_SIZE, count);
}
}
- // we are done so prepare the result
- ExchangeHelper.copyResults(exchange, target);
- log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- callback.done(true);
- return true;
- }
-
- protected boolean process(final Exchange exchange, final AsyncCallback callback,
- final AtomicInteger index, final AtomicInteger count, final AtomicBoolean doWhile,
- final Exchange original) {
-
- // set current index as property
- log.debug("LoopProcessor: iteration #{}", index.get());
- exchange.setProperty(Exchange.LOOP_INDEX, index.get());
-
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // increment counter after done
- index.getAndIncrement();
-
- // evaluate predicate for next loop
- if (predicate != null && index.get() > 0) {
- try {
- boolean result = predicate.matches(exchange);
- doWhile.set(result);
- } catch (Exception e) {
- // break out looping due that exception
- exchange.setException(e);
- doWhile.set(false);
- }
- }
-
- // we only have to handle async completion of the loop
- // (as the sync is done in the outer processor)
- if (doneSync) {
- return;
- }
-
- Exchange target = exchange;
-
- // continue looping asynchronously
- while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
-
- // check for error if so we should break out
- if (!continueProcessing(target, "so breaking out of loop", log)) {
- break;
- }
+ @Override
+ public void run() {
+ try {
+ // check for error if so we should break out
+ boolean cont = continueProcessing(current, "so breaking out of loop", log);
+ boolean doWhile = predicate == null || predicate.matches(current);
+ boolean doLoop = expression == null || index < count;
+ // iterate
+ if (cont && doWhile && doLoop) {
// and prepare for next iteration
- target = prepareExchange(exchange, index.get(), original);
-
- // process again
- boolean sync = process(target, callback, index, count, doWhile, original);
- if (!sync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
- // the remainder of the routing slip will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return;
- }
+ current = prepareExchange(exchange, index);
+
+ // set current index as property
+ log.debug("LoopProcessor: iteration #{}", index);
+ current.setProperty(Exchange.LOOP_INDEX, index);
+
+ processor.process(current, doneSync -> {
+ // increment counter after done
+ index++;
+ ReactiveHelper.schedule(this);
+ });
+ } else {
+ // we are done so prepare the result
+ ExchangeHelper.copyResults(exchange, current);
+ log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ callback.done(false);
}
-
- // we are done so prepare the result
- ExchangeHelper.copyResults(original, target);
- log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ } catch (Exception e) {
+ log.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
+ exchange.setException(e);
callback.done(false);
}
- });
+ }
- return sync;
+ public String toString() {
+ return "LoopState[" + exchange.getExchangeId() + "]";
+ }
}
/**
@@ -185,11 +135,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
* @param index the index of the next iteration
* @return the exchange to use
*/
- protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) {
+ protected Exchange prepareExchange(Exchange exchange, int index) {
if (copy) {
// use a copy but let it reuse the same exchange id so it appear as one exchange
// use the original exchange rather than the looping exchange (esp. with the async routing engine)
- return ExchangeHelper.createCopy(original, true);
+ return ExchangeHelper.createCopy(exchange, true);
} else {
ExchangeHelper.prepareOutToIn(exchange);
return exchange;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 731947f..5e6c991 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -22,19 +22,17 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
@@ -59,14 +57,15 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FilterIterator;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.KeyValueHolder;
-import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
-import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
+import org.apache.camel.util.concurrent.AsyncCompletionService;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -215,615 +214,247 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- final AtomicReference<Exchange> result = new AtomicReference<>();
- Iterable<ProcessorExchangePair> pairs = null;
-
+ Iterable<ProcessorExchangePair> pairs;
try {
- boolean sync = true;
-
pairs = createProcessorExchangePairs(exchange);
-
- if (isParallelProcessing()) {
- // ensure an executor is set when running in parallel
- ObjectHelper.notNull(executorService, "executorService", this);
- doProcessParallel(exchange, result, pairs, isStreaming(), callback);
- } else {
- sync = doProcessSequential(exchange, result, pairs, callback);
- }
-
- if (!sync) {
- // the remainder of the multicast will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
- }
} catch (Throwable e) {
exchange.setException(e);
// unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
// and do the done work
- doDone(exchange, null, pairs, callback, true, false);
+ doDone(exchange, null, null, callback, true, false);
return true;
}
- // multicasting was processed successfully
- // and do the done work
- Exchange subExchange = result.get() != null ? result.get() : null;
- doDone(exchange, subExchange, pairs, callback, true, true);
- return true;
- }
-
- protected void doProcessParallel(final Exchange original, final AtomicReference<Exchange> result, final Iterable<ProcessorExchangePair> pairs,
- final boolean streaming, final AsyncCallback callback) throws Exception {
-
- ObjectHelper.notNull(executorService, "ExecutorService", this);
- ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
-
- final CompletionService<Exchange> completion;
- if (streaming) {
- // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
- completion = new ExecutorCompletionService<>(executorService);
+ MulticastState state = new MulticastState(exchange, pairs, callback);
+ if (isParallelProcessing()) {
+ executorService.submit(() -> ReactiveHelper.schedule(state));
} else {
- // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
- completion = new SubmitOrderedCompletionService<>(executorService);
- }
-
- final AtomicInteger total = new AtomicInteger(0);
- final Iterator<ProcessorExchangePair> it = pairs.iterator();
-
- if (it.hasNext()) {
- // when parallel then aggregate on the fly
- final AtomicBoolean running = new AtomicBoolean(true);
- final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
- final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
- final AtomicReference<Exception> executionException = new AtomicReference<>();
-
- // issue task to execute in separate thread so it can aggregate on-the-fly
- // while we submit new tasks, and those tasks complete concurrently
- // this allows us to optimize work and reduce memory consumption
- final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
- aggregationOnTheFlyDone, allTasksSubmitted, executionException);
- final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
-
- log.trace("Starting to submit parallel tasks");
-
- try {
- while (it.hasNext()) {
- final ProcessorExchangePair pair = it.next();
- // in case the iterator returns null then continue to next
- if (pair == null) {
- continue;
- }
-
- final Exchange subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.intValue(), pairs, it);
-
- completion.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- // start the aggregation task at this stage only in order not to pile up too many threads
- if (aggregationTaskSubmitted.compareAndSet(false, true)) {
- // but only submit the aggregation task once
- aggregateExecutorService.submit(aggregateOnTheFlyTask);
- }
-
- if (!running.get()) {
- // do not start processing the task if we are not running
- return subExchange;
- }
-
- try {
- doProcessParallel(pair);
- } catch (Throwable e) {
- subExchange.setException(e);
- }
-
- // Decide whether to continue with the multicast or not; similar logic to the Pipeline
- Integer number = getExchangeIndex(subExchange);
- boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, log);
- if (stopOnException && !continueProcessing) {
- // signal to stop running
- running.set(false);
- // throw caused exception
- if (subExchange.getException() != null) {
- // wrap in exception to explain where it failed
- CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
- subExchange.setException(cause);
- }
- }
-
- log.trace("Parallel processing complete for exchange: {}", subExchange);
- return subExchange;
- }
- });
-
- total.incrementAndGet();
- }
- } catch (Throwable e) {
- // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented.
- // We have to catch the exception here otherwise the aggregator threads would pile up.
- if (e instanceof Exception) {
- executionException.set((Exception) e);
- } else {
- executionException.set(RuntimeCamelException.wrapRuntimeCamelException(e));
- }
- // and because of the exception we must signal we are done so the latch can open and let the other thread continue processing
- log.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
- log.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
- aggregationOnTheFlyDone.countDown();
- }
-
- // signal all tasks has been submitted
- log.trace("Signaling that all {} tasks has been submitted.", total.get());
- allTasksSubmitted.set(true);
-
- // its to hard to do parallel async routing so we let the caller thread be synchronously
- // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
- // wait for aggregation to be done
- log.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
- aggregationOnTheFlyDone.await();
-
- // did we fail for whatever reason, if so throw that caused exception
- if (executionException.get() != null) {
- if (log.isDebugEnabled()) {
- log.debug("Parallel processing failed due {}", executionException.get().getMessage());
- }
- throw executionException.get();
- }
+ ReactiveHelper.scheduleMain(state);
}
- // no everything is okay so we are done
- log.debug("Done parallel processing {} exchanges", total);
+ // the remainder of the multicast will be completed async
+ // so we break out now, then the callback will be invoked which then
+ // continue routing from where we left here
+ return false;
}
- /**
- * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
- * <p/>
- * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
- * before we perform aggregation. Instead this separate thread will run and aggregate when new
- * completed tasks is done.
- * <p/>
- * The logic is fairly complex as this implementation has to keep track how far it got, and also
- * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
- * processing when the entire splitting is done.
- */
- private final class AggregateOnTheFlyTask implements Runnable {
-
- private final AtomicReference<Exchange> result;
- private final Exchange original;
- private final AtomicInteger total;
- private final CompletionService<Exchange> completion;
- private final AtomicBoolean running;
- private final CountDownLatch aggregationOnTheFlyDone;
- private final AtomicBoolean allTasksSubmitted;
- private final AtomicReference<Exception> executionException;
-
- private AggregateOnTheFlyTask(AtomicReference<Exchange> result, Exchange original, AtomicInteger total,
- CompletionService<Exchange> completion, AtomicBoolean running,
- CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
- AtomicReference<Exception> executionException) {
- this.result = result;
- this.original = original;
- this.total = total;
- this.completion = completion;
- this.running = running;
- this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
- this.allTasksSubmitted = allTasksSubmitted;
- this.executionException = executionException;
- }
-
- public void run() {
- log.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
-
- try {
- aggregateOnTheFly();
- } catch (Throwable e) {
- if (e instanceof Exception) {
- executionException.set((Exception) e);
- } else {
- executionException.set(RuntimeCamelException.wrapRuntimeCamelException(e));
- }
- } finally {
- // must signal we are done so the latch can open and let the other thread continue processing
- log.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
- log.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
- aggregationOnTheFlyDone.countDown();
- }
- }
-
- private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
- final AtomicBoolean timedOut = new AtomicBoolean();
- boolean stoppedOnException = false;
- final StopWatch watch = new StopWatch();
- final AtomicInteger aggregated = new AtomicInteger();
- boolean done = false;
- // not a for loop as on the fly may still run
- while (!done) {
- // check if we have already aggregate everything
- if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
- log.debug("Done aggregating {} exchanges on the fly.", aggregated);
- break;
- }
-
- Future<Exchange> future;
- if (timedOut.get()) {
- // we are timed out but try to grab if some tasks has been completed
- // poll will return null if no tasks is present
- future = completion.poll();
- log.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
- } else if (timeout > 0) {
- long left = timeout - watch.taken();
- if (left < 0) {
- left = 0;
- }
- log.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
- future = completion.poll(left, TimeUnit.MILLISECONDS);
- } else {
- log.trace("Polling completion task #{}", aggregated);
- // we must not block so poll every second
- future = completion.poll(1, TimeUnit.SECONDS);
- if (future == null) {
- // and continue loop which will recheck if we are done
- continue;
- }
- }
-
- if (future == null) {
- ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
- if (parallelAggregate) {
- aggregateExecutorService.submit(task);
- } else {
- // in non parallel mode then just run the task
- task.run();
- }
- } else {
- // there is a result to aggregate
- Exchange subExchange = future.get();
-
- // Decide whether to continue with the multicast or not; similar logic to the Pipeline
- Integer number = getExchangeIndex(subExchange);
- boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, log);
- if (stopOnException && !continueProcessing) {
- // we want to stop on exception and an exception or failure occurred
- // this is similar to what the pipeline does, so we should do the same to not surprise end users
- // so we should set the failed exchange as the result and break out
- result.set(subExchange);
- stoppedOnException = true;
- break;
- }
-
- // we got a result so aggregate it
- ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
- if (parallelAggregate) {
- aggregateExecutorService.submit(task);
- } else {
- // in non parallel mode then just run the task
- task.run();
- }
- }
- }
-
- if (timedOut.get() || stoppedOnException) {
- if (timedOut.get()) {
- log.debug("Cancelling tasks due timeout after {} millis.", timeout);
- }
- if (stoppedOnException) {
- log.debug("Cancelling tasks due stopOnException.");
- }
- // cancel tasks as we timed out (its safe to cancel done tasks)
- running.set(false);
- }
+ protected void schedule(Runnable runnable) {
+ if (isParallelProcessing()) {
+ executorService.submit(() -> ReactiveHelper.schedule(runnable));
+ } else {
+ ReactiveHelper.scheduleLast(runnable, "Multicast next step");
}
}
- /**
- * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
- */
- private final class ParallelAggregateTask implements Runnable {
-
- private final AtomicReference<Exchange> result;
- private final Exchange subExchange;
- private final AtomicInteger aggregated;
+ protected class MulticastState implements Runnable {
- private ParallelAggregateTask(AtomicReference<Exchange> result, Exchange subExchange, AtomicInteger aggregated) {
- this.result = result;
- this.subExchange = subExchange;
- this.aggregated = aggregated;
- }
+ final Exchange original;
+ final Iterable<ProcessorExchangePair> pairs;
+ final AsyncCallback callback;
+ final Iterator<ProcessorExchangePair> iterator;
+ final ReentrantLock lock;
+ final AsyncCompletionService<Exchange> completion;
+ final AtomicReference<Exchange> result;
+ final AtomicInteger nbExchangeSent = new AtomicInteger();
+ final AtomicInteger nbAggregated = new AtomicInteger();
+ final AtomicBoolean allSent = new AtomicBoolean();
+ final AtomicBoolean done = new AtomicBoolean();
- @Override
- public void run() {
- try {
- if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
- } else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
- }
- } catch (Throwable e) {
- if (isStopOnAggregateException()) {
- throw e;
- } else {
- // wrap in exception to explain where it failed
- CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e);
- subExchange.setException(cex);
- log.debug(cex.getMessage(), cex);
- }
- } finally {
- aggregated.incrementAndGet();
+ MulticastState(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+ this.original = original;
+ this.pairs = pairs;
+ this.callback = callback;
+ this.iterator = pairs.iterator();
+ this.lock = new ReentrantLock();
+ this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock);
+ this.result = new AtomicReference<>();
+ if (timeout > 0) {
+ schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
}
}
- }
-
- /**
- * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
- */
- private final class ParallelAggregateTimeoutTask implements Runnable {
- private final Exchange original;
- private final AtomicReference<Exchange> result;
- private final CompletionService<Exchange> completion;
- private final AtomicInteger aggregated;
- private final AtomicInteger total;
- private final AtomicBoolean timedOut;
-
- private ParallelAggregateTimeoutTask(Exchange original, AtomicReference<Exchange> result, CompletionService<Exchange> completion,
- AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
- this.original = original;
- this.result = result;
- this.completion = completion;
- this.aggregated = aggregated;
- this.total = total;
- this.timedOut = timedOut;
+ public String toString() {
+ return "Step[" + original.getExchangeId() + "," + MulticastProcessor.this + "]";
}
- @Override
public void run() {
- AggregationStrategy strategy = getAggregationStrategy(null);
- // notify the strategy we timed out
- Exchange oldExchange = result.get();
- if (oldExchange == null) {
- // if they all timed out the result may not have been set yet, so use the original exchange
- oldExchange = original;
- }
- strategy.timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
- log.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
- timedOut.set(true);
-
- // mark that index as timed out, which allows us to try to retrieve
- // any already completed tasks in the next loop
- if (completion instanceof SubmitOrderedCompletionService) {
- ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
- }
-
- // we timed out so increment the counter
- aggregated.incrementAndGet();
- }
- }
-
- protected boolean doProcessSequential(Exchange original, AtomicReference<Exchange> result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
- AtomicInteger total = new AtomicInteger();
- Iterator<ProcessorExchangePair> it = pairs.iterator();
-
- while (it.hasNext()) {
- ProcessorExchangePair pair = it.next();
- // in case the iterator returns null then continue to next
- if (pair == null) {
- continue;
- }
- Exchange subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.get(), pairs, it);
-
- boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
- if (!sync) {
- if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
- }
- // the remainder of the multicast will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
- }
-
- if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
- }
-
- // Decide whether to continue with the multicast or not; similar logic to the Pipeline
- // remember to test for stop on exception and aggregate before copying back results
- boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
- if (stopOnException && !continueProcessing) {
- if (subExchange.getException() != null) {
- // wrap in exception to explain where it failed
- CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
- subExchange.setException(cause);
+ try {
+ if (done.get()) {
+ return;
}
- // we want to stop on exception, and the exception was handled by the error handler
- // this is similar to what the pipeline does, so we should do the same to not surprise end users
- // so we should set the failed exchange as the result and be done
- result.set(subExchange);
- return true;
- }
-
- log.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
-
- if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
- } else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
- }
-
- total.incrementAndGet();
- }
-
- log.debug("Done sequential processing {} exchanges", total);
-
- return true;
- }
- private boolean doProcessSequential(final Exchange original, final AtomicReference<Exchange> result,
- final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
- final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
- boolean sync = true;
-
- final Exchange exchange = pair.getExchange();
- Processor processor = pair.getProcessor();
- final Producer producer = pair.getProducer();
-
- try {
- StopWatch sw = null;
- if (producer != null) {
- boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
- if (sending) {
- sw = new StopWatch();
+ // Check if the iterator is empty
+ // This can only happen the very first time we check the existence
+ // of an item before queuing the run.
+ if (!iterator.hasNext()) {
+ doDone(null, true);
+ return;
}
- }
-
- // compute time taken if sending to another endpoint
- final StopWatch watch = sw;
-
- // let the prepared process it, remember to begin the exchange pair
- AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
- pair.begin();
- sync = async.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // we are done with the exchange pair
- pair.done();
-
- // okay we are done, so notify the exchange was sent
- if (producer != null && watch != null) {
- long timeTaken = watch.taken();
- Endpoint endpoint = producer.getEndpoint();
- // emit event that the exchange was sent to the endpoint
- EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
- }
- // we only have to handle async completion of the routing slip
- if (doneSync) {
- return;
- }
+ ProcessorExchangePair pair = iterator.next();
+ boolean hasNext = iterator.hasNext();
+ Exchange exchange = pair.getExchange();
- // continue processing the multicast asynchronously
- Exchange subExchange = exchange;
-
- // Decide whether to continue with the multicast or not; similar logic to the Pipeline
- // remember to test for stop on exception and aggregate before copying back results
- boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
- if (stopOnException && !continueProcessing) {
- if (subExchange.getException() != null) {
- // wrap in exception to explain where it failed
- subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
- } else {
- // we want to stop on exception, and the exception was handled by the error handler
- // this is similar to what the pipeline does, so we should do the same to not surprise end users
- // so we should set the failed exchange as the result and be done
- result.set(subExchange);
- }
- // and do the done work
- doDone(original, subExchange, pairs, callback, false, true);
- return;
- }
+ int index = nbExchangeSent.getAndIncrement();
+ updateNewExchange(exchange, index, pairs, hasNext);
- try {
- if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
- } else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
- }
- } catch (Throwable e) {
- original.setException(e);
- // and do the done work
- doDone(original, null, pairs, callback, false, true);
- return;
+ // Schedule the processing of the next pair
+ if (hasNext) {
+ if (isParallelProcessing()) {
+ schedule(this);
}
+ } else {
+ allSent.set(true);
+ }
- total.incrementAndGet();
-
- // maybe there are more processors to multicast
- while (it.hasNext()) {
-
- // prepare and run the next
- ProcessorExchangePair pair = it.next();
- subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.get(), pairs, it);
- boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
+ completion.submit(exchangeResult -> {
+ // compute time taken if sending to another endpoint
+ StopWatch watch = beforeSend(pair);
- if (!sync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
- return;
- }
+ AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor());
+ async.process(exchange, doneSync -> {
+ afterSend(pair, watch);
// Decide whether to continue with the multicast or not; similar logic to the Pipeline
// remember to test for stop on exception and aggregate before copying back results
- continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
+ boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Multicast processing failed for number " + index, log);
if (stopOnException && !continueProcessing) {
- if (subExchange.getException() != null) {
+ if (exchange.getException() != null) {
// wrap in exception to explain where it failed
- subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
+ exchange.setException(new CamelExchangeException("Multicast processing failed for number " + index, exchange, exchange.getException()));
} else {
// we want to stop on exception, and the exception was handled by the error handler
// this is similar to what the pipeline does, so we should do the same to not surprise end users
// so we should set the failed exchange as the result and be done
- result.set(subExchange);
+ result.set(exchange);
}
// and do the done work
- doDone(original, subExchange, pairs, callback, false, true);
+ doDone(exchange, true);
return;
}
- // must catch any exceptions from aggregation
- try {
- if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
- } else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
- }
- } catch (Throwable e) {
- // wrap in exception to explain where it failed
- subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
- // and do the done work
- doDone(original, subExchange, pairs, callback, false, true);
- return;
+ exchangeResult.accept(exchange);
+
+ // aggregate exchanges if any
+ aggregate();
+
+ // next step
+ if (hasNext && !isParallelProcessing()) {
+ schedule(this);
}
+ });
+ });
+ } catch (Exception e) {
+ original.setException(e);
+ doDone(null, false);
+ }
+ }
- total.incrementAndGet();
+ protected void aggregate() {
+ Lock lock = this.lock;
+ if (lock.tryLock()) {
+ try {
+ Exchange exchange;
+ while (!done.get() && (exchange = completion.poll()) != null) {
+ doAggregate(result, exchange);
+ if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
+ doDone(result.get(), true);
+ }
}
+ } catch (Throwable e) {
+ original.setException(e);
+ // and do the done work
+ doDone(null, false);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
- // do the done work
- subExchange = result.get() != null ? result.get() : null;
- doDone(original, subExchange, pairs, callback, false, true);
+ protected void timeout() {
+ Lock lock = this.lock;
+ if (lock.tryLock()) {
+ try {
+ while (nbAggregated.get() < nbExchangeSent.get()) {
+ Exchange exchange = completion.pollUnordered();
+ int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
+ while (nbAggregated.get() < index) {
+ AggregationStrategy strategy = getAggregationStrategy(null);
+ strategy.timeout(result.get() != null ? result.get() : original,
+ nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
+ }
+ if (exchange != null) {
+ doAggregate(result, exchange);
+ nbAggregated.incrementAndGet();
+ }
+ }
+ doDone(result.get(), true);
+ } catch (Throwable e) {
+ original.setException(e);
+ // and do the done work
+ doDone(null, false);
+ } finally {
+ lock.unlock();
}
- });
- } finally {
+ }
}
- return sync;
+ protected void doDone(Exchange exchange, boolean forceExhaust) {
+ if (done.compareAndSet(false, true)) {
+ MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+ }
+ }
}
- private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
- final Exchange exchange = pair.getExchange();
- Processor processor = pair.getProcessor();
- Producer producer = pair.getProducer();
-
- // compute time taken if sending to another endpoint
- StopWatch watch = null;
- try {
- if (producer != null) {
- boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
- if (sending) {
- watch = new StopWatch();
+ protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
+ if (executor instanceof ScheduledExecutorService) {
+ ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
+ } else {
+ executor.execute(() -> {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // ignore
}
+ runnable.run();
+ });
+ }
+ }
+
+ protected StopWatch beforeSend(ProcessorExchangePair pair) {
+ StopWatch watch;
+ final Exchange e = pair.getExchange();
+ final Producer p = pair.getProducer();
+ if (p != null) {
+ boolean sending = EventHelper.notifyExchangeSending(e.getContext(), e, p.getEndpoint());
+ if (sending) {
+ watch = new StopWatch();
+ } else {
+ watch = null;
}
- // let the prepared process it, remember to begin the exchange pair
- AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
- pair.begin();
- // we invoke it synchronously as parallel async routing is too hard
- AsyncProcessorHelper.process(async, exchange);
- } finally {
- pair.done();
- if (producer != null && watch != null) {
- Endpoint endpoint = producer.getEndpoint();
- long timeTaken = watch.taken();
- // emit event that the exchange was sent to the endpoint
- // this is okay to do here in the finally block, as the processing is not using the async routing engine
- //( we invoke it synchronously as parallel async routing is too hard)
- EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
- }
+ } else {
+ watch = null;
+ }
+
+ // let the prepared process it, remember to begin the exchange pair
+ pair.begin();
+
+ // return the watch
+ return watch;
+ }
+
+ protected void afterSend(ProcessorExchangePair pair, StopWatch watch) {
+ // we are done with the exchange pair
+ pair.done();
+
+ // okay we are done, so notify the exchange was sent
+ final Producer producer = pair.getProducer();
+ if (producer != null && watch != null) {
+ long timeTaken = watch.taken();
+ final Exchange e = pair.getExchange();
+ Endpoint endpoint = producer.getEndpoint();
+ // emit event that the exchange was sent to the endpoint
+ EventHelper.notifyExchangeSent(e.getContext(), e, endpoint, timeTaken);
}
}
@@ -886,11 +517,28 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
if (exception) {
// multicast uses error handling on its output processors and they have tried to redeliver
// so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
+ // also try to redeliver as we would then do that twice
original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
}
- callback.done(doneSync);
+ ReactiveHelper.callback(callback);
+ }
+
+ /**
+ * Aggregate the {@link Exchange} with the current result.
+ * This method is synchronized and is called directly when parallelAggregate is disabled (by default).
+ *
+ * @param result the current result
+ * @param exchange the exchange to be added to the result
+ * @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+ * @see #doAggregateSync(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+ */
+ protected void doAggregate(AtomicReference<Exchange> result, Exchange exchange) {
+ if (parallelAggregate) {
+ doAggregateInternal(getAggregationStrategy(exchange), result, exchange);
+ } else {
+ doAggregateSync(getAggregationStrategy(exchange), result, exchange);
+ }
}
/**
@@ -902,7 +550,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
* @param exchange the exchange to be added to the result
* @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
*/
- protected synchronized void doAggregate(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
+ protected synchronized void doAggregateSync(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
doAggregateInternal(strategy, result, exchange);
}
@@ -914,7 +562,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
* @param strategy the aggregation strategy to use
* @param result the current result
* @param exchange the exchange to be added to the result
- * @see #doAggregate
+ * @see #doAggregateSync
*/
protected void doAggregateInternal(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
if (strategy != null) {
@@ -925,10 +573,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
}
- protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
- Iterator<ProcessorExchangePair> it) {
+ protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
exchange.setProperty(Exchange.MULTICAST_INDEX, index);
- if (it.hasNext()) {
+ if (hasNext) {
exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
} else {
exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
@@ -1119,10 +766,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
if (isParallelProcessing() && executorService == null) {
throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
}
- if (timeout > 0 && !isParallelProcessing()) {
- throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
- }
- if (isParallelProcessing() && aggregateExecutorService == null) {
+ if (aggregateExecutorService == null) {
// use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
// and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
// and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
@@ -1145,7 +789,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
*/
protected synchronized ExecutorService createAggregateExecutorService(String name) {
// use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
- return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+ return camelContext.getExecutorServiceManager().newScheduledThreadPool(this, name, 0);
}
@Override
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index 31c93de..e544e0f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -20,15 +20,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.Traceable;
+import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
+import org.apache.camel.support.ServiceHelper;
+import org.apache.camel.support.ServiceSupport;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -36,12 +43,15 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
* Creates a Pipeline pattern where the output of the previous step is sent as
* input to the next step, reusing the same message exchanges
*/
-public class Pipeline extends MulticastProcessor {
+public class Pipeline extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+ private final CamelContext camelContext;
+ private List<AsyncProcessor> processors;
private String id;
public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
- super(camelContext, processors);
+ this.camelContext = camelContext;
+ this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
}
public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
@@ -77,130 +87,74 @@ public class Pipeline extends MulticastProcessor {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- Iterator<Processor> processors = getProcessors().iterator();
- Exchange nextExchange = exchange;
- boolean first = true;
-
- while (continueRouting(processors, nextExchange)) {
- if (first) {
- first = false;
- } else {
- // prepare for next run
- nextExchange = createNextExchange(nextExchange);
- }
-
- // get the next processor
- Processor processor = processors.next();
+ ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+ "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+ return false;
+ }
- AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
- boolean sync = process(exchange, nextExchange, callback, processors, async);
+ protected void doProcess(Exchange exchange, AsyncCallback callback, Iterator<AsyncProcessor> processors, boolean first) {
+ if (continueRouting(processors, exchange)
+ && (first || continueProcessing(exchange, "so breaking out of pipeline", log))) {
- // continue as long its being processed synchronously
- if (!sync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
- // the remainder of the pipeline will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
+ // prepare for next run
+ if (exchange.hasOut()) {
+ exchange.setIn(exchange.getOut());
+ exchange.setOut(null);
}
- log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+ // get the next processor
+ AsyncProcessor processor = processors.next();
- // check for error if so we should break out
- if (!continueProcessing(nextExchange, "so breaking out of pipeline", log)) {
- break;
- }
+ processor.process(exchange, doneSync ->
+ ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+ "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
}
+ else {
+ ExchangeHelper.copyResults(exchange, exchange);
- // logging nextExchange as it contains the exchange that might have altered the payload and since
- // we are logging the completion if will be confusing if we log the original instead
- // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
- log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange);
-
- // copy results back to the original exchange
- ExchangeHelper.copyResults(exchange, nextExchange);
-
- callback.done(true);
- return true;
- }
-
- private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback,
- final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) {
- // this does the actual processing so log at trace level
- log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-
- // implement asynchronous routing logic in callback so we can have the callback being
- // triggered and then continue routing where we left
- boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
- @Override
- public void done(final boolean doneSync) {
- // we only have to handle async completion of the pipeline
- if (doneSync) {
- return;
- }
-
- // continue processing the pipeline asynchronously
- Exchange nextExchange = exchange;
- while (continueRouting(processors, nextExchange)) {
- AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
-
- // check for error if so we should break out
- if (!continueProcessing(nextExchange, "so breaking out of pipeline", log)) {
- break;
- }
-
- nextExchange = createNextExchange(nextExchange);
- boolean isDoneSync = process(original, nextExchange, callback, processors, processor);
- if (!isDoneSync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
- return;
- }
- }
-
- ExchangeHelper.copyResults(original, nextExchange);
- log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original);
- callback.done(false);
- }
- });
-
- return sync;
- }
+ // logging nextExchange as it contains the exchange that might have altered the payload and since
+ // we are logging the completion if will be confusing if we log the original instead
+ // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+ log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- /**
- * Strategy method to create the next exchange from the previous exchange.
- * <p/>
- * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
- *
- * @param previousExchange the previous exchange
- * @return a new exchange
- */
- protected Exchange createNextExchange(Exchange previousExchange) {
- return PipelineHelper.createNextExchange(previousExchange);
+ ReactiveHelper.callback(callback);
+ }
}
- protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
- boolean answer = true;
-
+ protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) {
Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
if (stop != null) {
boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
if (doStop) {
log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
- answer = false;
+ return false;
}
- } else {
- // continue if there are more processors to route
- answer = it.hasNext();
}
-
+ // continue if there are more processors to route
+ boolean answer = it.hasNext();
log.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer);
return answer;
}
@Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(processors);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(processors);
+ }
+
+ @Override
public String toString() {
return "Pipeline[" + getProcessors() + "]";
}
+ public List<Processor> getProcessors() {
+ return (List) processors;
+ }
+
@Override
public String getTraceLabel() {
return "pipeline";
@@ -215,4 +169,15 @@ public class Pipeline extends MulticastProcessor {
public void setId(String id) {
this.id = id;
}
+
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ return new ArrayList<>(processors);
+ }
+
+ public boolean hasNext() {
+ return processors != null && !processors.isEmpty();
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 4df33a3..7851ca0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -18,8 +18,6 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,16 +37,17 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.reifier.ErrorHandlerReifier;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.SubUnitOfWorkCallback;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.CamelContextHelper;
-import org.apache.camel.spi.CamelLogger;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
@@ -84,171 +83,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
protected final Processor onPrepareProcessor;
protected final Processor onExceptionProcessor;
- /**
- * Contains the current redelivery data
- */
- protected class RedeliveryData {
- // redelivery state
- Exchange original;
- boolean sync = true;
- int redeliveryCounter;
- long redeliveryDelay;
- Predicate retryWhilePredicate;
- boolean redeliverFromSync;
-
- // default behavior which can be overloaded on a per exception basis
- RedeliveryPolicy currentRedeliveryPolicy;
- Processor failureProcessor;
- Processor onRedeliveryProcessor;
- Processor onExceptionProcessor;
- Predicate handledPredicate;
- Predicate continuedPredicate;
- boolean useOriginalInMessage;
-
- public RedeliveryData() {
- // init with values from the error handler
- this.retryWhilePredicate = retryWhilePolicy;
- this.currentRedeliveryPolicy = redeliveryPolicy;
- this.onRedeliveryProcessor = redeliveryProcessor;
- this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
- this.handledPredicate = getDefaultHandledPredicate();
- this.useOriginalInMessage = useOriginalMessagePolicy;
- }
- }
-
- /**
- * Task for sleeping during redelivery attempts.
- * <p/>
- * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
- * is used for sleeping and trigger redeliveries.
- */
- private final class RedeliverSleepTask {
-
- private final RedeliveryPolicy policy;
- private final long delay;
-
- RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
- this.policy = policy;
- this.delay = delay;
- }
-
- public boolean sleep() throws InterruptedException {
- // for small delays then just sleep
- if (delay < 1000) {
- policy.sleep(delay);
- return true;
- }
-
- StopWatch watch = new StopWatch();
-
- log.debug("Sleeping for: {} millis until attempting redelivery", delay);
- while (watch.taken() < delay) {
- // sleep using 1 sec interval
-
- long delta = delay - watch.taken();
- long max = Math.min(1000, delta);
- if (max > 0) {
- log.trace("Sleeping for: {} millis until waking up for re-check", max);
- Thread.sleep(max);
- }
-
- // are we preparing for shutdown then only do redelivery if allowed
- if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
- log.debug("Rejected redelivery while stopping");
- return false;
- }
- }
-
- return true;
- }
- }
-
- /**
- * Tasks which performs asynchronous redelivery attempts, and being triggered by a
- * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
- * has to be delayed before a redelivery attempt is performed.
- */
- private final class AsyncRedeliveryTask implements Callable<Boolean> {
-
- private final Exchange exchange;
- private final AsyncCallback callback;
- private final RedeliveryData data;
-
- AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
- this.exchange = exchange;
- this.callback = callback;
- this.data = data;
- }
-
- public Boolean call() throws Exception {
- // prepare for redelivery
- prepareExchangeForRedelivery(exchange, data);
-
- // letting onRedeliver be executed at first
- deliverToOnRedeliveryProcessor(exchange, data);
-
- if (log.isTraceEnabled()) {
- log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", exchange.getExchangeId(), outputAsync, exchange);
- }
-
- // emmit event we are doing redelivery
- EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
-
- // process the exchange (also redelivery)
- boolean sync;
- if (data.redeliverFromSync) {
- // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
- // this error handler, which means we have to invoke the callback with false, to have the callback
- // be notified when we are done
- sync = outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
-
- // mark we are in sync mode now
- data.sync = false;
-
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (isDone(exchange)) {
- callback.done(false);
- return;
- }
-
- // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- processAsyncErrorHandler(exchange, callback, data);
- }
- });
- } else {
- // this redelivery task was scheduled from asynchronous, which means we should only
- // handle when the asynchronous task was done
- sync = outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
-
- // this callback should only handle the async case
- if (doneSync) {
- return;
- }
-
- // mark we are in async mode now
- data.sync = false;
-
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (isDone(exchange)) {
- callback.done(doneSync);
- return;
- }
- // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- processAsyncErrorHandler(exchange, callback, data);
- }
- });
- }
-
- return sync;
- }
- }
-
public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy,
@@ -301,6 +135,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
}
+ public void process(Exchange exchange) throws Exception {
+ if (output == null) {
+ // no output then just return
+ return;
+ }
+ awaitManager.process(this, exchange);
+ }
+
+ /**
+ * Process the exchange using redelivery error handling.
+ */
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ // Create the redelivery state object for this exchange
+ RedeliveryState state = new RedeliveryState(exchange, callback);
+ // Run it
+ ReactiveHelper.scheduleMain(state);
+ return false;
+ }
+
/**
* Allows to change the output of the error handler which are used when optimising the
* JMX instrumentation to use either an advice or wrapped processor when calling a processor.
@@ -332,61 +185,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
return answer;
}
- protected boolean isRunAllowed(RedeliveryData data) {
- // if camel context is forcing a shutdown then do not allow running
- boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
- if (forceShutdown) {
- log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
- return false;
- }
-
- // redelivery policy can control if redelivery is allowed during stopping/shutdown
- // but this only applies during a redelivery (counter must > 0)
- if (data.redeliveryCounter > 0) {
- if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
- log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
- return true;
- } else if (preparingShutdown) {
- // we are preparing for shutdown, now determine if we can still run
- boolean answer = isRunAllowedOnPreparingShutdown();
- log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
- return answer;
- }
- }
-
- // we cannot run if we are stopping/stopped
- boolean answer = !isStoppingOrStopped();
- log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
- return answer;
- }
-
protected boolean isRunAllowedOnPreparingShutdown() {
return false;
}
- protected boolean isRedeliveryAllowed(RedeliveryData data) {
- // redelivery policy can control if redelivery is allowed during stopping/shutdown
- // but this only applies during a redelivery (counter must > 0)
- if (data.redeliveryCounter > 0) {
- boolean stopping = isStoppingOrStopped();
- if (!preparingShutdown && !stopping) {
- log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
- return true;
- } else {
- // we are stopping or preparing to shutdown
- if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
- log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
- return true;
- } else {
- log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
- return false;
- }
- }
- }
-
- return true;
- }
-
@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
// prepare for shutdown, eg do not allow redelivery if configured
@@ -394,202 +196,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
preparingShutdown = true;
}
- public void process(Exchange exchange) throws Exception {
- if (output == null) {
- // no output then just return
- return;
- }
-
- // inline org.apache.camel.support.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
- // to optimize and reduce stacktrace lengths
- final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- awaitManager.countDown(exchange, latch);
- }
- }
- });
- if (!sync) {
- awaitManager.await(exchange, latch);
- }
- }
-
- /**
- * Process the exchange using redelivery error handling.
- */
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- final RedeliveryData data = new RedeliveryData();
-
- // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
- // original Exchange is being redelivered, and not a mutated Exchange
- data.original = defensiveCopyExchangeIfNeeded(exchange);
-
- // use looping to have redelivery attempts
- while (true) {
-
- // can we still run
- if (!isRunAllowed(data)) {
- log.trace("Run not allowed, will reject executing exchange: {}", exchange);
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- }
- // we cannot process so invoke callback
- callback.done(data.sync);
- return data.sync;
- }
-
- // did previous processing cause an exception?
- boolean handle = shouldHandleException(exchange);
- if (handle) {
- handleException(exchange, data, isDeadLetterChannel());
- onExceptionOccurred(exchange, data);
- }
-
- // compute if we are exhausted, and whether redelivery is allowed
- boolean exhausted = isExhausted(exchange, data);
- boolean redeliverAllowed = isRedeliveryAllowed(data);
-
- // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
- if (!redeliverAllowed || exhausted) {
- Processor target = null;
- boolean deliver = true;
-
- // the unit of work may have an optional callback associated we need to leverage
- SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
- if (uowCallback != null) {
- // signal to the callback we are exhausted
- uowCallback.onExhausted(exchange);
- // do not deliver to the failure processor as its been handled by the callback instead
- deliver = false;
- }
-
- if (deliver) {
- // should deliver to failure processor (either from onException or the dead letter channel)
- target = data.failureProcessor != null ? data.failureProcessor : deadLetter;
- }
- // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
- // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
- boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == deadLetter);
- boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
- // we are breaking out
- return sync;
- }
-
- if (data.redeliveryCounter > 0) {
- // calculate delay
- data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
-
- if (data.redeliveryDelay > 0) {
- // okay there is a delay so create a scheduled task to have it executed in the future
-
- if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
-
- // we are doing a redelivery then a thread pool must be configured (see the doStart method)
- ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
-
- // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
- // have it being executed in the future, or immediately
- // we are continuing asynchronously
-
- // mark we are routing async from now and that this redelivery task came from a synchronous routing
- data.sync = false;
- data.redeliverFromSync = true;
- AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
-
- // schedule the redelivery task
- if (log.isTraceEnabled()) {
- log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
- }
- executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
-
- return false;
- } else {
- // async delayed redelivery was disabled or we are transacted so we must be synchronous
- // as the transaction manager requires to execute in the same thread context
- try {
- // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
- redeliverySleepCounter.incrementAndGet();
- RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
- boolean complete = task.sleep();
- redeliverySleepCounter.decrementAndGet();
- if (!complete) {
- // the task was rejected
- exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
- // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
- exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- // jump to start of loop which then detects that we are failed and exhausted
- continue;
- }
- } catch (InterruptedException e) {
- redeliverySleepCounter.decrementAndGet();
- // we was interrupted so break out
- exchange.setException(e);
- // mark the exchange to stop continue routing when interrupted
- // as we do not want to continue routing (for example a task has been cancelled)
- exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
- callback.done(data.sync);
- return data.sync;
- }
- }
- }
-
- // prepare for redelivery
- prepareExchangeForRedelivery(exchange, data);
-
- // letting onRedeliver be executed
- deliverToOnRedeliveryProcessor(exchange, data);
-
- // emmit event we are doing redelivery
- EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
- }
-
- // process the exchange (also redelivery)
- boolean sync = outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // this callback should only handle the async case
- if (sync) {
- return;
- }
-
- // mark we are in async mode now
- data.sync = false;
-
- // if we are done then notify callback and exit
- if (isDone(exchange)) {
- callback.done(sync);
- return;
- }
-
- // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- // method which takes care of this in a asynchronous manner
- processAsyncErrorHandler(exchange, callback, data);
- }
- });
-
- if (!sync) {
- // the remainder of the Exchange is being processed asynchronously so we should return
- return false;
- }
- // we continue to route synchronously
-
- // if we are done then notify callback and exit
- boolean done = isDone(exchange);
- if (done) {
- callback.done(true);
- return true;
- }
-
- // error occurred so loop back around.....
- }
- }
-
/**
* <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
* and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
*
- * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
- * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
+ * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryState#redeliveryDelay}
+ * and {@link RedeliveryState#redeliveryCounter} are copied in.</p>
*
* @param exchange The current exchange in question.
* @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
@@ -610,91 +222,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
/**
- * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
- * <p/>
- * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
- * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
- * in terms of logic.
- */
- protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
- // can we still run
- if (!isRunAllowed(data)) {
- log.trace("Run not allowed, will reject executing exchange: {}", exchange);
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- }
- callback.done(data.sync);
- return;
- }
-
- // did previous processing cause an exception?
- boolean handle = shouldHandleException(exchange);
- if (handle) {
- handleException(exchange, data, isDeadLetterChannel());
- onExceptionOccurred(exchange, data);
- }
-
- // compute if we are exhausted or not
- boolean exhausted = isExhausted(exchange, data);
- if (exhausted) {
- Processor target = null;
- boolean deliver = true;
-
- // the unit of work may have an optional callback associated we need to leverage
- UnitOfWork uow = exchange.getUnitOfWork();
- if (uow != null) {
- SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
- if (uowCallback != null) {
- // signal to the callback we are exhausted
- uowCallback.onExhausted(exchange);
- // do not deliver to the failure processor as its been handled by the callback instead
- deliver = false;
- }
- }
-
- if (deliver) {
- // should deliver to failure processor (either from onException or the dead letter channel)
- target = data.failureProcessor != null ? data.failureProcessor : deadLetter;
- }
- // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
- // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
- boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter;
- deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
- // we are breaking out
- return;
- }
-
- if (data.redeliveryCounter > 0) {
- // we are doing a redelivery then a thread pool must be configured (see the doStart method)
- ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
-
- // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
- // have it being executed in the future, or immediately
- // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
- // to ensure the callback will continue routing from where we left
- AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
-
- // calculate the redelivery delay
- data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
-
- if (data.redeliveryDelay > 0) {
- // schedule the redelivery task
- if (log.isTraceEnabled()) {
- log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
- }
- executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
- } else {
- // execute the task immediately
- executorService.submit(task);
- }
- }
- }
-
- /**
- * Performs a defensive copy of the exchange if needed
- *
- * @param exchange the exchange
- * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
+ * Performs a defensive copy of the exchange if needed
+ *
+ * @param exchange the exchange
+ * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
*/
protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
// only do a defensive copy if redelivery is enabled
@@ -790,292 +321,486 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
return null;
}
- protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
- Exception caught = exchange.getException();
-
- // we continue so clear any exceptions
- exchange.setException(null);
- // clear rollback flags
- exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
-
- // its continued then remove traces of redelivery attempted and caught exception
- exchange.getIn().removeHeader(Exchange.REDELIVERED);
- exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
- exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
- exchange.removeProperty(Exchange.FAILURE_HANDLED);
- // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
-
- // create log message
- String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
- msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
- msg = msg + ". Handled and continue routing.";
-
- // log that we failed but want to continue
- logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
- }
+ /**
+ * Contains the current redelivery state
+ */
+ protected class RedeliveryState implements Runnable {
+ Exchange original;
+ Exchange exchange;
+ AsyncCallback callback;
+ boolean sync = true;
+ int redeliveryCounter;
+ long redeliveryDelay;
+ Predicate retryWhilePredicate;
+ boolean redeliverFromSync;
+
+ // default behavior which can be overloaded on a per exception basis
+ RedeliveryPolicy currentRedeliveryPolicy;
+ Processor failureProcessor;
+ Processor onRedeliveryProcessor;
+ Processor onExceptionProcessor;
+ Predicate handledPredicate;
+ Predicate continuedPredicate;
+ boolean useOriginalInMessage;
+
+ public RedeliveryState(Exchange exchange, AsyncCallback callback) {
+ // init with values from the error handler
+ this.retryWhilePredicate = retryWhilePolicy;
+ this.currentRedeliveryPolicy = redeliveryPolicy;
+ this.handledPredicate = getDefaultHandledPredicate();
+ this.useOriginalInMessage = useOriginalMessagePolicy;
+ this.onRedeliveryProcessor = redeliveryProcessor;
+ this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
- protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
- if (!redeliveryEnabled) {
- throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
+ // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
+ // original Exchange is being redelivered, and not a mutated Exchange
+ this.original = defensiveCopyExchangeIfNeeded(exchange);
+ this.exchange = exchange;
+ this.callback = callback;
}
- // there must be a defensive copy of the exchange
- ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
- // okay we will give it another go so clear the exception so we can try again
- exchange.setException(null);
+ public String toString() {
+ return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]";
+ }
- // clear rollback flags
- exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+ /**
+ * Redelivery logic.
+ */
+ public void run() {
+ // can we still run
+ if (!isRunAllowed()) {
+ log.trace("Run not allowed, will reject executing exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(false);
+ return;
+ }
- // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
- // and then put these on the exchange when doing a redelivery / fault processor
+ // did previous processing cause an exception?
+ boolean handle = shouldHandleException(exchange);
+ if (handle) {
+ handleException();
+ onExceptionOccurred();
+ }
- // preserve these headers
- Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
- Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
- Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
+ // compute if we are exhausted or not
+ boolean exhausted = isExhausted(exchange);
+ boolean redeliverAllowed = isRedeliveryAllowed();
- // we are redelivering so copy from original back to exchange
- exchange.getIn().copyFrom(data.original.getIn());
- exchange.setOut(null);
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
+ // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
+ if (!redeliverAllowed || exhausted) {
+ Processor target = null;
+ boolean deliver = true;
- // put back headers
- if (redeliveryCounter != null) {
- exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
- }
- if (redeliveryMaxCounter != null) {
- exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
- }
- if (redelivered != null) {
- exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
- }
- }
+ // the unit of work may have an optional callback associated we need to leverage
+ UnitOfWork uow = exchange.getUnitOfWork();
+ if (uow != null) {
+ SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
+ if (uowCallback != null) {
+ // signal to the callback we are exhausted
+ uowCallback.onExhausted(exchange);
+ // do not deliver to the failure processor as its been handled by the callback instead
+ deliver = false;
+ }
+ }
- protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
- Exception e = exchange.getException();
- // e is never null
-
- Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
- if (previous != null && previous != e) {
- // a 2nd exception was thrown while handling a previous exception
- // so we need to add the previous as suppressed by the new exception
- // see also FatalFallbackErrorHandler
- Throwable[] suppressed = e.getSuppressed();
- boolean found = false;
- for (Throwable t : suppressed) {
- if (t == previous) {
- found = true;
+ if (deliver) {
+ // should deliver to failure processor (either from onException or the dead letter channel)
+ target = failureProcessor != null ? failureProcessor : deadLetter;
}
+ // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
+ // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
+ boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter;
+ deliverToFailureProcessor(target, isDeadLetterChannel, exchange);
+ // we are breaking out
}
- if (!found) {
- e.addSuppressed(previous);
- }
- }
+ else if (redeliveryCounter > 0) {
+ // calculate the redelivery delay
+ redeliveryDelay = determineRedeliveryDelay(exchange, currentRedeliveryPolicy, redeliveryDelay, redeliveryCounter);
- // store the original caused exception in a property, so we can restore it later
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+ if (redeliveryDelay > 0) {
+ // okay there is a delay so create a scheduled task to have it executed in the future
- // find the error handler to use (if any)
- OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
- if (exceptionPolicy != null) {
- data.currentRedeliveryPolicy = ErrorHandlerReifier.createRedeliveryPolicy(exceptionPolicy, exchange.getContext(), data.currentRedeliveryPolicy);
- data.handledPredicate = exceptionPolicy.getHandledPolicy();
- data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
- data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
- data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
+ if (currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
- // route specific failure handler?
- Processor processor = null;
- UnitOfWork uow = exchange.getUnitOfWork();
- if (uow != null && uow.getRouteContext() != null) {
- String routeId = uow.getRouteContext().getRoute().getId();
- processor = exceptionPolicy.getErrorHandler(routeId);
- } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
- // note this should really not happen, but we have this code as a fail safe
- // to be backwards compatible with the old behavior
- log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
- processor = exceptionPolicy.getErrorHandlers().iterator().next();
- }
- if (processor != null) {
- data.failureProcessor = processor;
- }
+ // we are doing a redelivery then a thread pool must be configured (see the doStart method)
+ ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
- // route specific on redelivery?
- processor = exceptionPolicy.getOnRedelivery();
- if (processor != null) {
- data.onRedeliveryProcessor = processor;
+ // schedule the redelivery task
+ if (log.isTraceEnabled()) {
+ log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
+ }
+ executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+
+ } else {
+ // async delayed redelivery was disabled or we are transacted so we must be synchronous
+ // as the transaction manager requires to execute in the same thread context
+ try {
+ // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
+ redeliverySleepCounter.incrementAndGet();
+ boolean complete = sleep();
+ redeliverySleepCounter.decrementAndGet();
+ if (!complete) {
+ // the task was rejected
+ exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
+ // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
+ exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+ // jump to start of loop which then detects that we are failed and exhausted
+ ReactiveHelper.schedule(this);
+ } else {
+ ReactiveHelper.schedule(this::redeliver);
+ }
+ } catch (InterruptedException e) {
+ redeliverySleepCounter.decrementAndGet();
+ // we was interrupted so break out
+ exchange.setException(e);
+ // mark the exchange to stop continue routing when interrupted
+ // as we do not want to continue routing (for example a task has been cancelled)
+ exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
+ ReactiveHelper.callback(callback);
+ }
+ }
+ } else {
+ // execute the task immediately
+ ReactiveHelper.schedule(this::redeliver);
+ }
}
- // route specific on exception occurred?
- processor = exceptionPolicy.getOnExceptionOccurred();
- if (processor != null) {
- data.onExceptionProcessor = processor;
+ else {
+ // Simple delivery
+ outputAsync.process(exchange, doneSync -> {
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (isDone(exchange)) {
+ ReactiveHelper.callback(callback);
+ } else {
+ // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ ReactiveHelper.schedule(this);
+ }
+ });
}
}
- // only log if not failure handled or not an exhausted unit of work
- if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
- String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
- + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
- logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
- }
+ protected boolean isRunAllowed() {
+ // if camel context is forcing a shutdown then do not allow running
+ boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(RedeliveryErrorHandler.this);
+ if (forceShutdown) {
+ log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
+ return false;
+ }
- data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
- }
+ // redelivery policy can control if redelivery is allowed during stopping/shutdown
+ // but this only applies during a redelivery (counter must > 0)
+ if (redeliveryCounter > 0) {
+ if (currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
+ log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
+ return true;
+ } else if (preparingShutdown) {
+ // we are preparing for shutdown, now determine if we can still run
+ boolean answer = isRunAllowedOnPreparingShutdown();
+ log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
+ return answer;
+ }
+ }
- /**
- * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
- * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
- */
- protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) {
- if (data.onExceptionProcessor == null) {
- return;
+ // we cannot run if we are stopping/stopped
+ boolean answer = !isStoppingOrStopped();
+ log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
+ return answer;
}
- // run this synchronously as its just a Processor
- try {
- if (log.isTraceEnabled()) {
- log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange);
+ protected boolean isRedeliveryAllowed() {
+ // redelivery policy can control if redelivery is allowed during stopping/shutdown
+ // but this only applies during a redelivery (counter must > 0)
+ if (redeliveryCounter > 0) {
+ boolean stopping = isStoppingOrStopped();
+ if (!preparingShutdown && !stopping) {
+ log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
+ return true;
+ } else {
+ // we are stopping or preparing to shutdown
+ if (currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
+ log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
+ return true;
+ } else {
+ log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
+ return false;
+ }
+ }
}
- data.onExceptionProcessor.process(exchange);
- } catch (Throwable e) {
- // we dont not want new exception to override existing, so log it as a WARN
- log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
- }
- log.trace("OnExceptionOccurred processor done");
- }
- /**
- * Gives an optional configured redelivery processor a chance to process before the Exchange
- * will be redelivered. This can be used to alter the Exchange.
- */
- protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
- if (data.onRedeliveryProcessor == null) {
- return;
+ return true;
}
- if (log.isTraceEnabled()) {
- log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
- data.onRedeliveryProcessor, exchange);
- }
+ protected void redeliver() {
+ // prepare for redelivery
+ prepareExchangeForRedelivery();
- // run this synchronously as its just a Processor
- try {
- data.onRedeliveryProcessor.process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
- log.trace("Redelivery processor done");
- }
+ // letting onRedeliver be executed at first
+ deliverToOnRedeliveryProcessor();
- /**
- * All redelivery attempts failed so move the exchange to the dead letter queue
- */
- protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
- final RedeliveryData data, final AsyncCallback callback) {
- boolean sync = true;
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", exchange.getExchangeId(), outputAsync, exchange);
+ }
+
+ // emmit event we are doing redelivery
+ EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, redeliveryCounter);
- Exception caught = exchange.getException();
+ // process the exchange (also redelivery)
+ outputAsync.process(exchange, doneSync -> {
+ log.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
- // we did not success with the redelivery so now we let the failure processor handle it
- // clear exception as we let the failure processor handle it
- exchange.setException(null);
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (isDone(exchange)) {
+ ReactiveHelper.callback(callback);
+ return;
+ } else {
+ // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ ReactiveHelper.schedule(this);
+ }
+ });
+ }
- final boolean shouldHandle = shouldHandle(exchange, data);
- final boolean shouldContinue = shouldContinue(exchange, data);
+ protected void prepareExchangeForContinue(Exchange exchange, boolean isDeadLetterChannel) {
+ Exception caught = exchange.getException();
- // regard both handled or continued as being handled
- boolean handled = false;
+ // we continue so clear any exceptions
+ exchange.setException(null);
+ // clear rollback flags
+ exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
- // always handle if dead letter channel
- boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
- if (handleOrContinue) {
- // its handled then remove traces of redelivery attempted
+ // its continued then remove traces of redelivery attempted and caught exception
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ exchange.removeProperty(Exchange.FAILURE_HANDLED);
+ // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
- // and remove traces of rollback only and uow exhausted markers
- exchange.removeProperty(Exchange.ROLLBACK_ONLY);
- exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
+ // create log message
+ String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+ msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught;
+ msg = msg + ". Handled and continue routing.";
- handled = true;
- } else {
- // must decrement the redelivery counter as we didn't process the redelivery but is
- // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
- decrementRedeliveryCounter(exchange);
+ // log that we failed but want to continue
+ logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, null);
}
- // we should allow using the failure processor if we should not continue
- // or in case of continue then the failure processor is NOT a dead letter channel
- // because you can continue and still let the failure processor do some routing
- // before continue in the main route.
- boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
-
- if (allowFailureProcessor && processor != null) {
-
- // prepare original IN body if it should be moved instead of current body
- if (data.useOriginalInMessage) {
- log.trace("Using the original IN message instead of current");
- Message original = ExchangeHelper.getOriginalInMessage(exchange);
- exchange.setIn(original);
- if (exchange.hasOut()) {
- log.trace("Removing the out message to avoid some uncertain behavior");
- exchange.setOut(null);
- }
+ protected void prepareExchangeForRedelivery() {
+ if (!redeliveryEnabled) {
+ throw new IllegalStateException("Redelivery is not enabled on " + RedeliveryErrorHandler.this + ". Make sure you have configured the error handler properly.");
}
+ // there must be a defensive copy of the exchange
+ ObjectHelper.notNull(this.original, "Defensive copy of Exchange is null", RedeliveryErrorHandler.this);
+
+ // okay we will give it another go so clear the exception so we can try again
+ exchange.setException(null);
+
+ // clear rollback flags
+ exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+ // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
+ // and then put these on the exchange when doing a redelivery / fault processor
+
+ // preserve these headers
+ Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+ Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
+ Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
+
+ // we are redelivering so copy from original back to exchange
+ exchange.getIn().copyFrom(this.original.getIn());
+ exchange.setOut(null);
// reset cached streams so they can be read again
MessageHelper.resetStreamCache(exchange.getIn());
- // invoke custom on prepare
- if (onPrepareProcessor != null) {
- try {
- log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
- onPrepareProcessor.process(exchange);
- } catch (Exception e) {
- // a new exception was thrown during prepare
- exchange.setException(e);
+ // put back headers
+ if (redeliveryCounter != null) {
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
+ }
+ if (redeliveryMaxCounter != null) {
+ exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
+ }
+ if (redelivered != null) {
+ exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
+ }
+ }
+
+ protected void handleException() {
+ Exception e = exchange.getException();
+ // e is never null
+
+ Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ if (previous != null && previous != e) {
+ // a 2nd exception was thrown while handling a previous exception
+ // so we need to add the previous as suppressed by the new exception
+ // see also FatalFallbackErrorHandler
+ Throwable[] suppressed = e.getSuppressed();
+ boolean found = false;
+ for (Throwable t : suppressed) {
+ if (t == previous) {
+ found = true;
+ }
+ }
+ if (!found) {
+ e.addSuppressed(previous);
}
}
- log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
+ // store the original caused exception in a property, so we can restore it later
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+
+ // find the error handler to use (if any)
+ OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
+ if (exceptionPolicy != null) {
+ currentRedeliveryPolicy = ErrorHandlerReifier.createRedeliveryPolicy(exceptionPolicy, exchange.getContext(), currentRedeliveryPolicy);
+ handledPredicate = exceptionPolicy.getHandledPolicy();
+ continuedPredicate = exceptionPolicy.getContinuedPolicy();
+ retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
+ useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
+
+ // route specific failure handler?
+ Processor processor = null;
+ UnitOfWork uow = exchange.getUnitOfWork();
+ if (uow != null && uow.getRouteContext() != null) {
+ String routeId = uow.getRouteContext().getRoute().getId();
+ processor = exceptionPolicy.getErrorHandler(routeId);
+ } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
+ // note this should really not happen, but we have this code as a fail safe
+ // to be backwards compatible with the old behavior
+ log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
+ processor = exceptionPolicy.getErrorHandlers().iterator().next();
+ }
+ if (processor != null) {
+ failureProcessor = processor;
+ }
- // store the last to endpoint as the failure endpoint
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- // and store the route id so we know in which route we failed
- UnitOfWork uow = exchange.getUnitOfWork();
- if (uow != null && uow.getRouteContext() != null) {
- exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+ // route specific on redelivery?
+ processor = exceptionPolicy.getOnRedelivery();
+ if (processor != null) {
+ onRedeliveryProcessor = processor;
+ }
+ // route specific on exception occurred?
+ processor = exceptionPolicy.getOnExceptionOccurred();
+ if (processor != null) {
+ onExceptionProcessor = processor;
+ }
}
- // fire event as we had a failure processor to handle it, which there is a event for
- final boolean deadLetterChannel = processor == deadLetter;
+ // only log if not failure handled or not an exhausted unit of work
+ if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
+ String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
+ + ". On delivery attempt: " + redeliveryCounter + " caught: " + e;
+ logFailedDelivery(true, false, false, false, isDeadLetterChannel(), exchange, msg, e);
+ }
- EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+ redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+ }
- // the failure processor could also be asynchronous
- AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
- sync = afp.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
- try {
- prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
- // fire event as we had a failure processor to handle it, which there is a event for
- EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
- } finally {
- // if the fault was handled asynchronously, this should be reflected in the callback as well
- data.sync &= sync;
- callback.done(data.sync);
- }
+ /**
+ * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
+ * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
+ */
+ protected void onExceptionOccurred() {
+ if (onExceptionProcessor == null) {
+ return;
+ }
+
+ // run this synchronously as its just a Processor
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", onExceptionProcessor, exchange);
}
- });
- } else {
+ onExceptionProcessor.process(exchange);
+ } catch (Throwable e) {
+ // we dont not want new exception to override existing, so log it as a WARN
+ log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
+ }
+ log.trace("OnExceptionOccurred processor done");
+ }
+
+ /**
+ * Gives an optional configured redelivery processor a chance to process before the Exchange
+ * will be redelivered. This can be used to alter the Exchange.
+ */
+ protected void deliverToOnRedeliveryProcessor() {
+ if (onRedeliveryProcessor == null) {
+ return;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
+ onRedeliveryProcessor, exchange);
+ }
+
+ // run this synchronously as its just a Processor
try {
+ onRedeliveryProcessor.process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ log.trace("Redelivery processor done");
+ }
+
+ /**
+ * All redelivery attempts failed so move the exchange to the dead letter queue
+ */
+ protected void deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange) {
+ Exception caught = exchange.getException();
+
+ // we did not success with the redelivery so now we let the failure processor handle it
+ // clear exception as we let the failure processor handle it
+ exchange.setException(null);
+
+ final boolean shouldHandle = shouldHandle(exchange);
+ final boolean shouldContinue = shouldContinue(exchange);
+
+ // regard both handled or continued as being handled
+ boolean handled = false;
+
+ // always handle if dead letter channel
+ boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
+ if (handleOrContinue) {
+ // its handled then remove traces of redelivery attempted
+ exchange.getIn().removeHeader(Exchange.REDELIVERED);
+ exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+ exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
+ exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
+ // and remove traces of rollback only and uow exhausted markers
+ exchange.removeProperty(Exchange.ROLLBACK_ONLY);
+ exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
+
+ handled = true;
+ } else {
+ // must decrement the redelivery counter as we didn't process the redelivery but is
+ // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
+ decrementRedeliveryCounter(exchange);
+ }
+
+ // we should allow using the failure processor if we should not continue
+ // or in case of continue then the failure processor is NOT a dead letter channel
+ // because you can continue and still let the failure processor do some routing
+ // before continue in the main route.
+ boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
+
+ if (allowFailureProcessor && processor != null) {
+
+ // prepare original IN body if it should be moved instead of current body
+ if (useOriginalInMessage) {
+ log.trace("Using the original IN message instead of current");
+ Message original = ExchangeHelper.getOriginalInMessage(exchange);
+ exchange.setIn(original);
+ if (exchange.hasOut()) {
+ log.trace("Removing the out message to avoid some uncertain behavior");
+ exchange.setOut(null);
+ }
+ }
+
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
// invoke custom on prepare
if (onPrepareProcessor != null) {
try {
@@ -1086,320 +811,390 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
exchange.setException(e);
}
}
- // no processor but we need to prepare after failure as well
- prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
- } finally {
- // callback we are done
- callback.done(data.sync);
- }
- }
- // create log message
- String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
- msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
- if (processor != null) {
- if (isDeadLetterChannel && deadLetterUri != null) {
- msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
+ log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
+
+ // store the last to endpoint as the failure endpoint
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ // and store the route id so we know in which route we failed
+ UnitOfWork uow = exchange.getUnitOfWork();
+ if (uow != null && uow.getRouteContext() != null) {
+ exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+ }
+
+ // fire event as we had a failure processor to handle it, which there is a event for
+ final boolean deadLetterChannel = processor == deadLetter;
+
+ EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+
+ // the failure processor could also be asynchronous
+ AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
+ afp.process(exchange, sync -> {
+ log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
+ try {
+ prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
+ // fire event as we had a failure processor to handle it, which there is a event for
+ EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+ } finally {
+ // if the fault was handled asynchronously, this should be reflected in the callback as well
+ ReactiveHelper.callback(callback);
+ }
+ });
} else {
- msg = msg + ". Processed by failure processor: " + processor;
+ try {
+ // invoke custom on prepare
+ if (onPrepareProcessor != null) {
+ try {
+ log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
+ onPrepareProcessor.process(exchange);
+ } catch (Exception e) {
+ // a new exception was thrown during prepare
+ exchange.setException(e);
+ }
+ }
+ // no processor but we need to prepare after failure as well
+ prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
+ } finally {
+ // callback we are done
+ ReactiveHelper.callback(callback);
+ }
}
- }
- // log that we failed delivery as we are exhausted
- logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null);
+ // create log message
+ String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+ msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught;
+ if (processor != null) {
+ if (isDeadLetterChannel && deadLetterUri != null) {
+ msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
+ } else {
+ msg = msg + ". Processed by failure processor: " + processor;
+ }
+ }
- return sync;
- }
+ // log that we failed delivery as we are exhausted
+ logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, null);
+ }
- protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
- final boolean shouldHandle, final boolean shouldContinue) {
+ protected void prepareExchangeAfterFailure(final Exchange exchange, final boolean isDeadLetterChannel,
+ final boolean shouldHandle, final boolean shouldContinue) {
- Exception newException = exchange.getException();
+ Exception newException = exchange.getException();
- // we could not process the exchange so we let the failure processor handled it
- ExchangeHelper.setFailureHandled(exchange);
+ // we could not process the exchange so we let the failure processor handled it
+ ExchangeHelper.setFailureHandled(exchange);
- // honor if already set a handling
- boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
- if (alreadySet) {
- boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
- log.trace("This exchange has already been marked for handling: {}", handled);
- if (!handled) {
- // exception not handled, put exception back in the exchange
- exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
- // and put failure endpoint back as well
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ // honor if already set a handling
+ boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
+ if (alreadySet) {
+ boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
+ log.trace("This exchange has already been marked for handling: {}", handled);
+ if (!handled) {
+ // exception not handled, put exception back in the exchange
+ exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+ // and put failure endpoint back as well
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
+ return;
}
- return;
- }
- // dead letter channel is special
- if (shouldContinue) {
- log.trace("This exchange is continued: {}", exchange);
- // okay we want to continue then prepare the exchange for that as well
- prepareExchangeForContinue(exchange, data, isDeadLetterChannel);
- } else if (shouldHandle) {
- log.trace("This exchange is handled so its marked as not failed: {}", exchange);
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
- } else {
- // okay the redelivery policy are not explicit set to true, so we should allow to check for some
- // special situations when using dead letter channel
- if (isDeadLetterChannel) {
-
- // DLC is always handling the first thrown exception,
- // but if its a new exception then use the configured option
- boolean handled = newException == null || deadLetterHandleNewException;
-
- // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
- // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
- // to avoid causing endless poison messages that fails forever)
- if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
- String uri = URISupport.sanitizeUri(deadLetterUri);
- String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
+ // dead letter channel is special
+ if (shouldContinue) {
+ log.trace("This exchange is continued: {}", exchange);
+ // okay we want to continue then prepare the exchange for that as well
+ prepareExchangeForContinue(exchange, isDeadLetterChannel);
+ } else if (shouldHandle) {
+ log.trace("This exchange is handled so its marked as not failed: {}", exchange);
+ exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
+ } else {
+ // okay the redelivery policy are not explicit set to true, so we should allow to check for some
+ // special situations when using dead letter channel
+ if (isDeadLetterChannel) {
+
+ // DLC is always handling the first thrown exception,
+ // but if its a new exception then use the configured option
+ boolean handled = newException == null || deadLetterHandleNewException;
+
+ // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
+ // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
+ // to avoid causing endless poison messages that fails forever)
+ if (newException != null && currentRedeliveryPolicy.isLogNewException()) {
+ String uri = URISupport.sanitizeUri(deadLetterUri);
+ String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
+ if (handled) {
+ msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
+ } else {
+ msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
+ }
+ logFailedDelivery(false, true, handled, false, true, exchange, msg, newException);
+ }
+
if (handled) {
- msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
- } else {
- msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
+ log.trace("This exchange is handled so its marked as not failed: {}", exchange);
+ exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
+ return;
}
- logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException);
}
- if (handled) {
- log.trace("This exchange is handled so its marked as not failed: {}", exchange);
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
- return;
- }
+ // not handled by default
+ prepareExchangeAfterFailureNotHandled(exchange);
}
-
- // not handled by default
- prepareExchangeAfterFailureNotHandled(exchange);
}
- }
-
- private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
- log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
- // exception not handled, put exception back in the exchange
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
- exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
- // and put failure endpoint back as well
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- // and store the route id so we know in which route we failed
- UnitOfWork uow = exchange.getUnitOfWork();
- if (uow != null && uow.getRouteContext() != null) {
- exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
- }
- }
- private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
- Exchange exchange, String message, RedeliveryData data, Throwable e) {
- if (logger == null) {
- return;
+ private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
+ log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
+ // exception not handled, put exception back in the exchange
+ exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
+ exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+ // and put failure endpoint back as well
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ // and store the route id so we know in which route we failed
+ UnitOfWork uow = exchange.getUnitOfWork();
+ if (uow != null && uow.getRouteContext() != null) {
+ exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+ }
}
- if (!exchange.isRollbackOnly()) {
- if (newException && !data.currentRedeliveryPolicy.isLogNewException()) {
- // do not log new exception
+ private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
+ Exchange exchange, String message, Throwable e) {
+ if (logger == null) {
return;
}
- // if we should not rollback, then check whether logging is enabled
+ if (!exchange.isRollbackOnly()) {
+ if (newException && !currentRedeliveryPolicy.isLogNewException()) {
+ // do not log new exception
+ return;
+ }
- if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) {
- // do not log handled
- return;
- }
+ // if we should not rollback, then check whether logging is enabled
- if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) {
- // do not log handled
- return;
- }
+ if (!newException && handled && !currentRedeliveryPolicy.isLogHandled()) {
+ // do not log handled
+ return;
+ }
- if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
- // do not log retry attempts
- return;
- }
+ if (!newException && continued && !currentRedeliveryPolicy.isLogContinued()) {
+ // do not log handled
+ return;
+ }
- if (!newException && shouldRedeliver) {
- if (data.currentRedeliveryPolicy.isLogRetryAttempted()) {
- if ((data.currentRedeliveryPolicy.getRetryAttemptedLogInterval() > 1) && (data.redeliveryCounter % data.currentRedeliveryPolicy.getRetryAttemptedLogInterval()) != 0) {
- // do not log retry attempt because it is excluded by the retryAttemptedLogInterval
+ if (!newException && shouldRedeliver && !currentRedeliveryPolicy.isLogRetryAttempted()) {
+ // do not log retry attempts
+ return;
+ }
+
+ if (!newException && shouldRedeliver) {
+ if (currentRedeliveryPolicy.isLogRetryAttempted()) {
+ if ((currentRedeliveryPolicy.getRetryAttemptedLogInterval() > 1) && (redeliveryCounter % currentRedeliveryPolicy.getRetryAttemptedLogInterval()) != 0) {
+ // do not log retry attempt because it is excluded by the retryAttemptedLogInterval
+ return;
+ }
+ } else {
+ // do not log retry attempts
return;
}
- } else {
- // do not log retry attempts
+ }
+
+ if (!newException && !shouldRedeliver && !currentRedeliveryPolicy.isLogExhausted()) {
+ // do not log exhausted
return;
}
}
- if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
- // do not log exhausted
- return;
+ LoggingLevel newLogLevel;
+ boolean logStackTrace;
+ if (exchange.isRollbackOnly()) {
+ newLogLevel = currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+ logStackTrace = currentRedeliveryPolicy.isLogStackTrace();
+ } else if (shouldRedeliver) {
+ newLogLevel = currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+ logStackTrace = currentRedeliveryPolicy.isLogRetryStackTrace();
+ } else {
+ newLogLevel = currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+ logStackTrace = currentRedeliveryPolicy.isLogStackTrace();
+ }
+ if (e == null) {
+ e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
}
- }
- LoggingLevel newLogLevel;
- boolean logStackTrace;
- if (exchange.isRollbackOnly()) {
- newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
- logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
- } else if (shouldRedeliver) {
- newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
- logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
- } else {
- newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
- logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
- }
- if (e == null) {
- e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
- }
+ if (newException) {
+ // log at most WARN level
+ if (newLogLevel == LoggingLevel.ERROR) {
+ newLogLevel = LoggingLevel.WARN;
+ }
+ String msg = message;
+ if (msg == null) {
+ msg = "New exception " + ExchangeHelper.logIds(exchange);
+ // special for logging the new exception
+ Throwable cause = e;
+ if (cause != null) {
+ msg = msg + " due: " + cause.getMessage();
+ }
+ }
- if (newException) {
- // log at most WARN level
- if (newLogLevel == LoggingLevel.ERROR) {
- newLogLevel = LoggingLevel.WARN;
- }
- String msg = message;
- if (msg == null) {
- msg = "New exception " + ExchangeHelper.logIds(exchange);
- // special for logging the new exception
- Throwable cause = e;
+ if (e != null && logStackTrace) {
+ logger.log(msg, e, newLogLevel);
+ } else {
+ logger.log(msg, newLogLevel);
+ }
+ } else if (exchange.isRollbackOnly()) {
+ String msg = "Rollback " + ExchangeHelper.logIds(exchange);
+ Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
if (cause != null) {
msg = msg + " due: " + cause.getMessage();
}
- }
- if (e != null && logStackTrace) {
- logger.log(msg, e, newLogLevel);
+ // should we include message history
+ if (!shouldRedeliver && currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
+ // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+ ExchangeFormatter formatter = customExchangeFormatter
+ ? exchangeFormatter : (currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+ String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
+ if (routeStackTrace != null) {
+ msg = msg + "\n" + routeStackTrace;
+ }
+ }
+
+ if (newLogLevel == LoggingLevel.ERROR) {
+ // log intended rollback on maximum WARN level (not ERROR)
+ logger.log(msg, LoggingLevel.WARN);
+ } else {
+ // otherwise use the desired logging level
+ logger.log(msg, newLogLevel);
+ }
} else {
- logger.log(msg, newLogLevel);
- }
- } else if (exchange.isRollbackOnly()) {
- String msg = "Rollback " + ExchangeHelper.logIds(exchange);
- Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
- if (cause != null) {
- msg = msg + " due: " + cause.getMessage();
- }
+ String msg = message;
+ // should we include message history
+ if (!shouldRedeliver && currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
+ // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+ ExchangeFormatter formatter = customExchangeFormatter
+ ? exchangeFormatter : (currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+ String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
+ if (routeStackTrace != null) {
+ msg = msg + "\n" + routeStackTrace;
+ }
+ }
- // should we include message history
- if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
- // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
- ExchangeFormatter formatter = customExchangeFormatter
- ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
- String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
- if (routeStackTrace != null) {
- msg = msg + "\n" + routeStackTrace;
+ if (e != null && logStackTrace) {
+ logger.log(msg, e, newLogLevel);
+ } else {
+ logger.log(msg, newLogLevel);
}
}
+ }
- if (newLogLevel == LoggingLevel.ERROR) {
- // log intended rollback on maximum WARN level (no ERROR)
- logger.log(msg, LoggingLevel.WARN);
- } else {
- // otherwise use the desired logging level
- logger.log(msg, newLogLevel);
- }
- } else {
- String msg = message;
- // should we include message history
- if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
- // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
- ExchangeFormatter formatter = customExchangeFormatter
- ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
- String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
- if (routeStackTrace != null) {
- msg = msg + "\n" + routeStackTrace;
- }
+ /**
+ * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
+ * <p/>
+ * If the exchange is exhausted, then we will not continue processing, but let the
+ * failure processor deal with the exchange.
+ *
+ * @param exchange the current exchange
+ * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
+ */
+ private boolean isExhausted(Exchange exchange) {
+ // if marked as rollback only then do not continue/redeliver
+ boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
+ if (exhausted) {
+ log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
+ return true;
}
- if (e != null && logStackTrace) {
- logger.log(msg, e, newLogLevel);
- } else {
- logger.log(msg, newLogLevel);
+ // if marked as rollback only then do not continue/redeliver
+ boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
+ if (rollbackOnly) {
+ log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
+ return true;
}
+ // its the first original call so continue
+ if (redeliveryCounter == 0) {
+ return false;
+ }
+ // its a potential redelivery so determine if we should redeliver or not
+ boolean redeliver = currentRedeliveryPolicy.shouldRedeliver(exchange, redeliveryCounter, retryWhilePredicate);
+ return !redeliver;
}
- }
- /**
- * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
- * <p/>
- * If the exchange is exhausted, then we will not continue processing, but let the
- * failure processor deal with the exchange.
- *
- * @param exchange the current exchange
- * @param data the redelivery data
- * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
- */
- private boolean isExhausted(Exchange exchange, RedeliveryData data) {
- // if marked as rollback only then do not continue/redeliver
- boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
- if (exhausted) {
- log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
- return true;
+ /**
+ * Determines whether or not to continue if we are exhausted.
+ *
+ * @param exchange the current exchange
+ * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
+ */
+ private boolean shouldContinue(Exchange exchange) {
+ if (continuedPredicate != null) {
+ return continuedPredicate.matches(exchange);
+ }
+ // do not continue by default
+ return false;
}
- // if marked as rollback only then do not continue/redeliver
- boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
- if (rollbackOnly) {
- log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
- return true;
- }
- // its the first original call so continue
- if (data.redeliveryCounter == 0) {
+ /**
+ * Determines whether or not to handle if we are exhausted.
+ *
+ * @param exchange the current exchange
+ * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
+ */
+ private boolean shouldHandle(Exchange exchange) {
+ if (handledPredicate != null) {
+ return handledPredicate.matches(exchange);
+ }
+ // do not handle by default
return false;
}
- // its a potential redelivery so determine if we should redeliver or not
- boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
- return !redeliver;
- }
- /**
- * Determines whether or not to continue if we are exhausted.
- *
- * @param exchange the current exchange
- * @param data the redelivery data
- * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
- */
- private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
- if (data.continuedPredicate != null) {
- return data.continuedPredicate.matches(exchange);
+ /**
+ * Increments the redelivery counter and adds the redelivered flag if the
+ * message has been redelivered
+ */
+ private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
+ Message in = exchange.getIn();
+ Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+ int next = counter != null ? counter + 1 : 1;
+ in.setHeader(Exchange.REDELIVERY_COUNTER, next);
+ in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+ // if maximum redeliveries is used, then provide that information as well
+ if (currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
+ in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, currentRedeliveryPolicy.getMaximumRedeliveries());
+ }
+ return next;
}
- // do not continue by default
- return false;
- }
- /**
- * Determines whether or not to handle if we are exhausted.
- *
- * @param exchange the current exchange
- * @param data the redelivery data
- * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
- */
- private boolean shouldHandle(Exchange exchange, RedeliveryData data) {
- if (data.handledPredicate != null) {
- return data.handledPredicate.matches(exchange);
- }
- // do not handle by default
- return false;
- }
+ /**
+ * Method for sleeping during redelivery attempts.
+ * <p/>
+ * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
+ * is used for sleeping and trigger redeliveries.
+ */
+ public boolean sleep() throws InterruptedException {
+ // for small delays then just sleep
+ if (redeliveryDelay < 1000) {
+ currentRedeliveryPolicy.sleep(redeliveryDelay);
+ return true;
+ }
- /**
- * Increments the redelivery counter and adds the redelivered flag if the
- * message has been redelivered
- */
- private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
- Message in = exchange.getIn();
- Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
- int next = 1;
- if (counter != null) {
- next = counter + 1;
- }
- in.setHeader(Exchange.REDELIVERY_COUNTER, next);
- in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
- // if maximum redeliveries is used, then provide that information as well
- if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
- in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
+ StopWatch watch = new StopWatch();
+
+ log.debug("Sleeping for: {} millis until attempting redelivery", redeliveryDelay);
+ while (watch.taken() < redeliveryDelay) {
+ // sleep using 1 sec interval
+
+ long delta = redeliveryDelay - watch.taken();
+ long max = Math.min(1000, delta);
+ if (max > 0) {
+ log.trace("Sleeping for: {} millis until waking up for re-check", max);
+ Thread.sleep(max);
+ }
+
+ // are we preparing for shutdown then only do redelivery if allowed
+ if (preparingShutdown && !currentRedeliveryPolicy.isAllowRedeliveryWhileStopping()) {
+ log.debug("Rejected redelivery while stopping");
+ return false;
+ }
+ }
+
+ return true;
}
- return next;
}
/**
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
index ffb2c58..a779716 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -44,6 +44,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ExpressionComparator;
import org.apache.camel.support.LoggingExceptionHandler;
@@ -73,7 +74,7 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
private Expression expression;
private final CamelContext camelContext;
- private final Processor processor;
+ private final AsyncProcessor processor;
private final Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
@@ -96,7 +97,7 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
// wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
this.camelContext = camelContext;
- this.processor = processor;
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
this.collection = collection;
this.expression = expression;
this.sender = new BatchSender();
@@ -293,8 +294,11 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
* Strategy Method to process an exchange in the batch. This method allows derived classes to perform
* custom processing before or after an individual exchange is processed
*/
- protected void processExchange(Exchange exchange) throws Exception {
- processor.process(exchange);
+ protected void processExchange(Exchange exchange) {
+ processor.process(exchange, sync -> postProcess(exchange));
+ }
+
+ protected void postProcess(Exchange exchange) {
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
}
@@ -519,13 +523,8 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
while (iter.hasNext()) {
Exchange exchange = iter.next();
iter.remove();
- try {
- log.debug("Sending aggregated exchange: {}", exchange);
- processExchange(exchange);
- } catch (Throwable t) {
- // must catch throwable to avoid growing memory
- getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
- }
+ log.debug("Sending aggregated exchange: {}", exchange);
+ processExchange(exchange);
}
}
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 05e9a1e..a928f49 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -399,7 +399,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
}
} catch (Exception e) {
// error resolving endpoint so we should break out
- ex.setException(e);
+ current.setException(e);
break;
}
@@ -426,7 +426,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
// okay we are completely done with the routing slip
// so we need to signal done on the original callback so it can continue
- originalCallback.done(false);
+ cb.done(false);
}
});
});
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index b641ee5..b711ad7 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -124,14 +124,14 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
final Exchange target = configureExchange(exchange, pattern);
final boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), target, destination);
- StopWatch sw = null;
+ // record timing for sending the exchange using the producer
+ StopWatch watch;
if (sending) {
- sw = new StopWatch();
+ watch = new StopWatch();
+ } else {
+ watch = null;
}
- // record timing for sending the exchange using the producer
- final StopWatch watch = sw;
-
try {
log.debug(">>>> {} {}", destination, exchange);
return producer.process(exchange, new AsyncCallback() {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 785bfcc..9848a6b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -33,6 +33,7 @@ import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.ReactiveHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,30 +82,23 @@ public class SharedCamelInternalProcessor {
*/
public void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor) {
final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
- final CountDownLatch latch = new CountDownLatch(1);
-
- boolean sync = process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- awaitManager.countDown(exchange, latch);
- }
+ awaitManager.process(new AsyncProcessor() {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ return SharedCamelInternalProcessor.this.process(exchange, callback, processor, resultProcessor);
}
@Override
- public String toString() {
- return "Done " + processor;
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalStateException();
}
- }, processor, resultProcessor);
-
- if (!sync) {
- awaitManager.await(exchange, latch);
- }
+ }, exchange);
}
/**
* Asynchronous API
*/
- public boolean process(Exchange exchange, AsyncCallback callback, AsyncProcessor processor, Processor resultProcessor) {
+ public boolean process(Exchange exchange, AsyncCallback ocallback, AsyncProcessor processor, Processor resultProcessor) {
// ----------------------------------------------------------
// CAMEL END USER - READ ME FOR DEBUGGING TIPS
// ----------------------------------------------------------
@@ -121,7 +115,7 @@ public class SharedCamelInternalProcessor {
if (processor == null || !continueProcessing(exchange, processor)) {
// no processor or we should not continue then we are done
- callback.done(true);
+ ocallback.done(true);
return true;
}
@@ -135,13 +129,13 @@ public class SharedCamelInternalProcessor {
states[i] = state;
} catch (Throwable e) {
exchange.setException(e);
- callback.done(true);
+ ocallback.done(true);
return true;
}
}
// create internal callback which will execute the advices in reverse order when done
- callback = new InternalCallback(states, exchange, callback, resultProcessor);
+ AsyncCallback callback = new InternalCallback(states, exchange, ocallback, resultProcessor);
// UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
@@ -188,15 +182,17 @@ public class SharedCamelInternalProcessor {
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- // execute any after processor work (in current thread, not in the callback)
- if (uow != null) {
- uow.afterProcess(processor, exchange, callback, sync);
- }
+ ReactiveHelper.scheduleLast(() -> {
+ // execute any after processor work (in current thread, not in the callback)
+ if (uow != null) {
+ uow.afterProcess(processor, exchange, callback, sync);
+ }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
- new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ }
+ }, "SharedCamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
return sync;
}
}
@@ -249,7 +245,7 @@ public class SharedCamelInternalProcessor {
// CAMEL END USER - DEBUG ME HERE +++ START +++
// ----------------------------------------------------------
// callback must be called
- callback.done(doneSync);
+ ReactiveHelper.callback(callback);
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index a535efb..a804f62 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -119,12 +119,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
throw exchange.getException();
}
- Iterable<ProcessorExchangePair> answer;
- if (isStreaming()) {
- answer = createProcessorExchangePairsIterable(exchange, value);
- } else {
- answer = createProcessorExchangePairsList(exchange, value);
- }
+ Iterable<ProcessorExchangePair> answer = isStreaming()
+ ? createProcessorExchangePairsIterable(exchange, value)
+ : createProcessorExchangePairsList(exchange, value);
if (exchange.getException() != null) {
// force any exceptions occurred during creation of exchange paris to be thrown
// before returning the answer;
@@ -245,8 +242,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
}
@Override
- protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
- Iterator<ProcessorExchangePair> it) {
+ protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
// do not share unit of work
exchange.setUnitOfWork(null);
@@ -255,7 +251,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
// non streaming mode, so we know the total size already
exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
}
- if (it.hasNext()) {
+ if (hasNext) {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
} else {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
index 37dd7ee..7414f4c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
@@ -82,10 +82,6 @@ public class ThroughputLogger extends ServiceSupport implements AsyncProcessor,
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
@@ -98,7 +94,14 @@ public class ThroughputLogger extends ServiceSupport implements AsyncProcessor,
logger.log(lastLogMessage);
}
}
+ }
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
callback.done(true);
return true;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
index 56cc9c5..e8178b1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.support.ServiceSupport;
@@ -64,80 +65,49 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- Iterator<Processor> processors = next().iterator();
- Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
+ ReactiveHelper.schedule(new TryState(exchange, callback));
+ return false;
+ }
- while (continueRouting(processors, exchange)) {
- exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
- ExchangeHelper.prepareOutToIn(exchange);
+ class TryState implements Runnable {
- // process the next processor
- Processor processor = processors.next();
- AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
- boolean sync = process(exchange, callback, processors, async, lastHandled);
+ final Exchange exchange;
+ final AsyncCallback callback;
+ final Iterator<Processor> processors;
+ final Object lastHandled;
- // continue as long its being processed synchronously
- if (!sync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
- // the remainder of the try .. catch .. finally will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
- }
-
- log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+ public TryState(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.processors = next().iterator();
+ this.lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
}
- ExchangeHelper.prepareOutToIn(exchange);
- exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
- log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- callback.done(true);
- return true;
- }
-
- protected boolean process(final Exchange exchange, final AsyncCallback callback,
- final Iterator<Processor> processors, final AsyncProcessor processor,
- final Object lastHandled) {
- // this does the actual processing so log at trace level
- log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-
- // implement asynchronous routing logic in callback so we can have the callback being
- // triggered and then continue routing where we left
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // we only have to handle async completion of the pipeline
- if (doneSync) {
- return;
- }
-
- // continue processing the try .. catch .. finally asynchronously
- while (continueRouting(processors, exchange)) {
- exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
- ExchangeHelper.prepareOutToIn(exchange);
-
- // process the next processor
- AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
- doneSync = process(exchange, callback, processors, processor, lastHandled);
-
- if (!doneSync) {
- log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
- // the remainder of the try .. catch .. finally will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return;
- }
- }
+ @Override
+ public void run() {
+ if (continueRouting(processors, exchange)) {
+ exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+ ExchangeHelper.prepareOutToIn(exchange);
+ // process the next processor
+ Processor processor = processors.next();
+ AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
+ log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+ } else {
ExchangeHelper.prepareOutToIn(exchange);
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
callback.done(false);
}
- });
+ }
- return sync;
+ public String toString() {
+ return "TryState[" + exchange.getExchangeId() + "]";
+ }
}
protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 40de8e6..b2d829c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -251,19 +251,6 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann
+ " See more details at the InterceptStrategy javadoc."
+ " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
+ " but its not the most optimal solution. Please consider changing your interceptor to comply.");
-
- // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
- // however its not the most optimal solution, but we can still run.
- InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
- wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next);
- // Avoid the stack overflow
- if (!wrapped.equals(bridge)) {
- bridge.setTarget(wrapped);
- } else {
- // Just skip the wrapped processor
- bridge.setTarget(null);
- }
- wrapped = bridge;
}
if (!(wrapped instanceof WrapProcessor)) {
// wrap the target so it becomes a service and we can manage its lifecycle
diff --git a/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java b/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
index 645fe67..5e92eea 100644
--- a/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
+++ b/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
@@ -38,7 +38,7 @@ class DelayReifier extends ExpressionReifier<DelayDefinition> {
Processor childProcessor = this.createChildProcessor(routeContext, false);
Expression delay = createAbsoluteTimeDelayExpression(routeContext);
- boolean async = definition.getAsyncDelayed() != null && definition.getAsyncDelayed();
+ boolean async = definition.getAsyncDelayed() == null || definition.getAsyncDelayed();
boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, async);
ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", definition, async);
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
index 70c00f7..cea7343 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
@@ -46,23 +46,7 @@ public final class AsyncProcessorHelper {
*/
public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
-
- final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- awaitManager.countDown(exchange, latch);
- }
- }
-
- @Override
- public String toString() {
- return "Done " + processor;
- }
- });
- if (!sync) {
- awaitManager.await(exchange, latch);
- }
+ awaitManager.process(processor, exchange);
}
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
new file mode 100644
index 0000000..f5a66e6
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReactiveHelper {
+
+ private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReactiveHelper.class);
+
+ private ReactiveHelper() {
+ }
+
+ public static void scheduleMain(Runnable runnable) {
+ WORKERS.get().schedule(runnable, true, true);
+ }
+
+ public static void scheduleMain(Runnable runnable, String description) {
+ WORKERS.get().schedule(describe(runnable, description), true, true);
+ }
+
+ public static void schedule(Runnable runnable) {
+ WORKERS.get().schedule(runnable, true, false);
+ }
+
+ public static void schedule(Runnable runnable, String description) {
+ WORKERS.get().schedule(describe(runnable, description), true, false);
+ }
+
+ public static void scheduleLast(Runnable runnable, String description) {
+ WORKERS.get().schedule(describe(runnable, description), false, false);
+ }
+
+ public static boolean executeFromQueue() {
+ return WORKERS.get().executeFromQueue();
+ }
+
+ public static void callback(AsyncCallback callback) {
+ schedule(new Runnable() {
+ @Override
+ public void run() {
+ callback.done(false);
+ }
+ @Override
+ public String toString() {
+ return "Callback[" + callback + "]";
+ }
+ });
+ }
+
+ private static Runnable describe(Runnable runnable, String description) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ runnable.run();
+ }
+ @Override
+ public String toString() {
+ return description;
+ }
+ };
+ }
+
+ private static class Worker {
+
+ LinkedList<Runnable> queue = new LinkedList<>();
+ LinkedList<LinkedList<Runnable>> back;
+ boolean running;
+
+ public void schedule(Runnable runnable, boolean first, boolean main) {
+ if (main) {
+ if (!queue.isEmpty()) {
+ if (back == null) {
+ back = new LinkedList<>();
+ }
+ back.push(queue);
+ queue = new LinkedList<>();
+ }
+ }
+ if (first) {
+ queue.addFirst(runnable);
+ } else {
+ queue.addLast(runnable);
+ }
+ if (!running) {
+ running = true;
+// Thread thread = Thread.currentThread();
+// String name = thread.getName();
+ try {
+ for (; ; ) {
+ final Runnable polled = queue.poll();
+ if (polled == null) {
+ if (back != null && !back.isEmpty()) {
+ queue = back.poll();
+ continue;
+ } else {
+ break;
+ }
+ }
+ try {
+// thread.setName(name + " - " + polled.toString());
+ polled.run();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ } finally {
+// thread.setName(name);
+ running = false;
+ }
+ } else {
+ LOG.debug("Queuing reactive work: {}", runnable);
+ }
+ }
+
+ public boolean executeFromQueue() {
+ final Runnable polled = queue != null ? queue.poll() : null;
+ if (polled == null) {
+ return false;
+ }
+ Thread thread = Thread.currentThread();
+ String name = thread.getName();
+ try {
+ thread.setName(name + " - " + polled.toString());
+ polled.run();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ thread.setName(name);
+ }
+ return true;
+ }
+
+ }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
index bbaf72b..b856ac0 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
@@ -39,10 +39,10 @@ public class SedaBlockWhenFullTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from(BLOCK_WHEN_FULL_URI).delay(DELAY_LONG).to(MOCK_URI);
+ from(BLOCK_WHEN_FULL_URI).delay(DELAY_LONG).syncDelayed().to(MOCK_URI);
// use same delay as above on purpose
- from(DEFAULT_URI).delay(DELAY).to("mock:whatever");
+ from(DEFAULT_URI).delay(DELAY).syncDelayed().to("mock:whatever");
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
index 3006ff8..f9232ea 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
@@ -27,8 +27,8 @@ import org.junit.Test;
public class SedaDefaultBlockWhenFullTest extends ContextTestSupport {
private static final int QUEUE_SIZE = 1;
- private static final int DELAY = 10;
- private static final int DELAY_LONG = 100;
+ private static final int DELAY = 100;
+ private static final int DELAY_LONG = 1000;
private static final String MOCK_URI = "mock:blockWhenFullOutput";
private static final String SIZE_PARAM = "?size=%d";
private static final String BLOCK_WHEN_FULL_URI = "seda:blockingFoo" + String.format(SIZE_PARAM, QUEUE_SIZE) + "&timeout=0";
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
index 2d20cb9..e29ae9f 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
@@ -16,13 +16,20 @@
*/
package org.apache.camel.impl;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
import org.apache.camel.FailedToStartRouteException;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultEndpoint;
import org.junit.Test;
@@ -64,6 +71,7 @@ public class MultipleConsumersSupportTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
MyOtherEndpoint my = new MyOtherEndpoint();
+ my.setCamelContext(context);
from(my).to("mock:a");
from(my).to("mock:b");
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
similarity index 61%
copy from camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
copy to camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
index e7a000d..bbb0772 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
@@ -16,23 +16,19 @@
*/
package org.apache.camel.impl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
-
- private static String foo = "";
- private static CountDownLatch latch = new CountDownLatch(1);
+public class PendingExchangesAsyncDelayShutdownGracefulTest extends ContextTestSupport {
@Test
public void testShutdownGraceful() throws Exception {
- getMockEndpoint("mock:foo").expectedMinimumMessageCount(1);
+ MockEndpoint result = getMockEndpoint("mock:result");
+ MockEndpoint foo = getMockEndpoint("mock:foo");
+
+ foo.expectedMinimumMessageCount(1);
template.sendBody("seda:foo", "A");
template.sendBody("seda:foo", "B");
@@ -42,12 +38,9 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
- // now stop the route before its complete
- assertTrue(latch.await(10, TimeUnit.SECONDS));
context.stop();
- // it should wait as there was 1 inflight exchange and 4 pending messages left
- assertEquals("Should graceful shutdown", "ABCDE", foo);
+ assertEquals("Expecting all messages", 5, result.getReceivedCounter());
}
@Override
@@ -55,12 +48,10 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo").to("mock:foo").delay(500).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- foo = foo + exchange.getIn().getBody(String.class);
- latch.countDown();
- }
- });
+ from("seda:foo")
+ .to("mock:foo")
+ .delay(1000)
+ .to("mock:result");
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
index e7a000d..e12823a 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
@@ -55,7 +55,7 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo").to("mock:foo").delay(500).process(new Processor() {
+ from("seda:foo").to("mock:foo").delay(500).syncDelayed().process(new Processor() {
public void process(Exchange exchange) throws Exception {
foo = foo + exchange.getIn().getBody(String.class);
latch.countDown();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
index db6fcad..839fc78 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
@@ -63,14 +63,14 @@ public class PendingExchangesTwoRouteShutdownGracefulTest extends ContextTestSup
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo").to("mock:foo").delay(100).process(new Processor() {
+ from("seda:foo").to("mock:foo").delay(100).syncDelayed().process(new Processor() {
public void process(Exchange exchange) throws Exception {
foo = foo + exchange.getIn().getBody(String.class);
latch.countDown();
}
});
- from("seda:bar").to("mock:bar").delay(50).process(new Processor() {
+ from("seda:bar").to("mock:bar").delay(50).syncDelayed().process(new Processor() {
public void process(Exchange exchange) throws Exception {
bar = bar + exchange.getIn().getBody(String.class);
latch.countDown();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java b/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
index 4f45c1a..9605543 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
@@ -20,6 +20,8 @@ import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.DelegateProcessor;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.event.CamelContextStartedEvent;
@@ -100,7 +102,11 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport {
assertEquals("should be DLC", true, e.isDeadLetterChannel());
assertTrue("should be marked as failure handled", e.isHandled());
assertFalse("should not be continued", e.isContinued());
- SendProcessor send = assertIsInstanceOf(SendProcessor.class, e.getFailureHandler());
+ Processor fh = e.getFailureHandler();
+ if (fh.getClass().getName().endsWith("ProcessorToReactiveProcessorBridge")) {
+ fh = ((DelegateProcessor) fh).getProcessor();
+ }
+ SendProcessor send = assertIsInstanceOf(SendProcessor.class, fh);
assertEquals("mock://dead", send.getDestination().getEndpointUri());
assertEquals("mock://dead", e.getDeadLetterUri());
diff --git a/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
index f20b06c..21c4dbe 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
@@ -73,7 +73,7 @@ public class CharlesSplitAndTryCatchRollbackIssueTest extends ContextTestSupport
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 2."));
+ assertTrue(ee.getMessage().startsWith("Multicast processing failed for number 2."));
RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
assertTrue(re.getMessage().startsWith("Intended rollback"));
}
@@ -96,7 +96,7 @@ public class CharlesSplitAndTryCatchRollbackIssueTest extends ContextTestSupport
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 3."));
+ assertTrue(ee.getMessage().startsWith("Multicast processing failed for number 3."));
RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
assertTrue(re.getMessage().startsWith("Intended rollback"));
}
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
index 736ebe0..b4d96f0 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -50,10 +50,13 @@ public class RecipientListParallelWithAggregationStrategyThrowingExceptionTest e
// must use share UoW if we want the error handler to react on
// exceptions
// from the aggregation strategy also.
- from("direct:start").
- recipientList(header("recipients")).aggregationStrategy(new MyAggregateBean()).
- parallelProcessing().stopOnAggregateException().shareUnitOfWork()
- .end()
+ from("direct:start")
+ .recipientList(header("recipients"))
+ .aggregationStrategy(new MyAggregateBean())
+ .parallelProcessing()
+ .stopOnAggregateException()
+ .shareUnitOfWork()
+ .end()
.to("mock:end");
}
};
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
index 7f46e3a..727c230 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
@@ -58,18 +58,6 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
}
assertMockEndpointsSatisfied();
}
- List<Thread> aggregatorThreads = getAggregatorThreads();
- assertEquals(1, aggregatorThreads.size());
- }
-
- private List<Thread> getAggregatorThreads() {
- List<Thread> result = new ArrayList<>();
- for (Thread t : Thread.getAllStackTraces().keySet()) {
- if (t.getName().endsWith("Splitter-AggregateTask")) {
- result.add(t);
- }
- }
- return result;
}
@Override
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
index 8b55afd..83f341f 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
@@ -83,7 +83,7 @@ public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSuppo
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(10).redeliveryDelay(10))
+ from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(10).redeliveryDelay(100L))
.to("log:before")
// will use our custom pool
.threads()
@@ -94,7 +94,7 @@ public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSuppo
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
- latch.await(5, TimeUnit.SECONDS);
+ latch.await(500, TimeUnit.MILLISECONDS);
}
})
.to("log:after")
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
index 7e1a838..d29e7f4 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
@@ -20,9 +20,6 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.junit.Test;
-/**
- * @version
- */
public class LoopAsyncCopyTest extends ContextTestSupport {
@Test
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
index 381a564..507ed14 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
@@ -20,9 +20,6 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.junit.Test;
-/**
- * @version
- */
public class LoopAsyncNoCopyTest extends ContextTestSupport {
@Test
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
index 23092ad..6948265 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.camel.processor;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -70,7 +71,7 @@ public class MulticastParallelStopOnExceptionTest extends ContextTestSupport {
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(cause.getMessage().startsWith("Parallel processing failed for number "));
+ assertTrue(cause.getMessage().startsWith("Multicast processing failed for number "));
assertEquals("Forced", cause.getCause().getMessage());
String body = cause.getExchange().getIn().getBody(String.class);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
index 6ae9480..40d3063 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
@@ -76,7 +76,7 @@ public class MulticastParallelStreamingTest extends ContextTestSupport {
.end()
.to("mock:result");
- from("direct:a").delay(250).setBody(constant("A"));
+ from("direct:a").delay(250).asyncDelayed().setBody(constant("A"));
from("direct:b").setBody(constant("B"));
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
index c2b82ca..39b4d23 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
@@ -51,7 +51,7 @@ public class MulticastStopOnExceptionTest extends ContextTestSupport {
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1."));
+ assertTrue(cause.getMessage().startsWith("Multicast processing failed for number 1."));
assertEquals("Forced", cause.getCause().getMessage());
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
index d20f77f..f26885e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.processor;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
@@ -25,10 +28,12 @@ import org.junit.Test;
/**
* Unit test for navigating a route (runtime processors, not the model).
+ *
+ * @version
*/
public class NavigateRouteTest extends ContextTestSupport {
- private static int count;
+ private static List<Processor> processors = new ArrayList<>();
@Test
public void testNavigateRoute() throws Exception {
@@ -42,7 +47,7 @@ public class NavigateRouteTest extends ContextTestSupport {
Navigate<Processor> nav = context.getRoutes().get(0).navigate();
navigateRoute(nav);
- assertEquals("There should be 6 processors to navigate", 6, count);
+ assertEquals("There should be 6 processors to navigate", 6, processors.size());
}
@SuppressWarnings("unchecked")
@@ -50,9 +55,12 @@ public class NavigateRouteTest extends ContextTestSupport {
if (!nav.hasNext()) {
return;
}
+ if (nav.getClass().getName().endsWith("ProcessorToReactiveProcessorBridge")) {
+ nav = (Navigate) ((Navigate) nav).next().get(0);
+ }
for (Processor child : nav.next()) {
- count++;
+ processors.add(child);
if (child instanceof SendProcessor) {
SendProcessor send = (SendProcessor) child;
@@ -67,7 +75,7 @@ public class NavigateRouteTest extends ContextTestSupport {
// navigate children
if (child instanceof Navigate) {
navigateRoute((Navigate<Processor>) child);
- }
+ }
}
}
@@ -77,9 +85,9 @@ public class NavigateRouteTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .convertBodyTo(String.class)
- .split(body().tokenize(" "))
- .to("mock:result");
+ .convertBodyTo(String.class)
+ .split(body().tokenize(" "))
+ .to("mock:result");
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
index d42cc53..5cc6240 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
@@ -42,6 +42,10 @@ public class RecipientListContextScopedOnExceptionIssueTest extends ContextTestS
String routeId = exchange.getUnitOfWork().getRouteContext().getRoute().getId();
assertEquals("fail", routeId);
}
+ @Override
+ public String toString() {
+ return "AssertRouteId";
+ }
}).to("mock:error");
interceptSendToEndpoint("direct*").process(new Processor() {
@@ -49,6 +53,9 @@ public class RecipientListContextScopedOnExceptionIssueTest extends ContextTestS
String target = exchange.getIn().getHeader(Exchange.INTERCEPTED_ENDPOINT, String.class);
exchange.getIn().setHeader("target", target);
}
+ public String toString() {
+ return "SetTargetHeader";
+ }
});
from("direct:start").routeId("start")
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
index 4c4a572..8ae88c6 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
public class ShutdownCompleteAllTasksTest extends ContextTestSupport {
- private static String url = "file:target/pending?initialDelay=0&delay=10";
+ private static String url = "file:target/pending?initialDelay=0&delay=10&synchronous=true";
private static AtomicInteger counter = new AtomicInteger();
private static CountDownLatch latch = new CountDownLatch(2);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
index 9ca4ae0..f9a23f7 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
public class ShutdownCompleteCurrentTaskOnlyTest extends ContextTestSupport {
- private static String url = "file:target/pending?initialDelay=0&delay=10";
+ private static String url = "file:target/pending?initialDelay=0&delay=10&synchronous=true";
@Override
@Before
@@ -65,7 +65,7 @@ public class ShutdownCompleteCurrentTaskOnlyTest extends ContextTestSupport {
from(url)
// let it complete only current task so we shutdown faster
.shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
- .delay(1000).to("seda:foo");
+ .delay(1000).syncDelayed().to("seda:foo");
from("seda:foo").routeId("route2").to("mock:bar");
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
index 90067b6..f8e2303 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
@@ -67,7 +67,7 @@ public class SplitterParallelStopOnExceptionTest extends ContextTestSupport {
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(cause.getMessage().startsWith("Parallel processing failed for number "));
+ assertTrue(cause.getMessage().startsWith("Multicast processing failed for number "));
assertEquals("Forced", cause.getCause().getMessage());
String body = cause.getExchange().getIn().getBody(String.class);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
index 5aaeb09..c7e4542 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
@@ -48,7 +48,7 @@ public class SplitterStopOnExceptionTest extends ContextTestSupport {
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1."));
+ assertTrue(cause.getMessage().startsWith("Multicast processing failed for number 1."));
assertEquals("Forced", cause.getCause().getMessage());
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
index e67e91c..2c94b95 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
@@ -77,7 +77,7 @@ public class ThreadsRejectedExecutionTest extends ContextTestSupport {
.to("log:before")
// will use our custom pool
.threads().executorService(pool).callerRunsWhenRejected(false)
- .delay(200)
+ .delay(200).syncDelayed()
.to("log:after")
.to("mock:result");
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
index 7e65f8f..cd456a3 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
@@ -49,6 +49,8 @@ public class UnmarshalProcessorTest extends TestSupport {
Exchange exchange2 = createExchangeWithBody(context, "body2");
Processor processor = new UnmarshalProcessor(new MyDataFormat(exchange2));
+ exchange.getExchangeId();
+ exchange2.getExchangeId();
processor.process(exchange);
Exception e = exchange.getException();
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
index 2b02338..c0b9c91 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
@@ -18,12 +18,14 @@ package org.apache.camel.processor.async;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.spi.InterceptStrategy;
import org.junit.Test;
@@ -91,13 +93,12 @@ public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
public Processor wrapProcessorInInterceptors(final CamelContext context, final NamedNode definition,
final Processor target, final Processor nextTarget) throws Exception {
- return new Processor() {
- public void process(Exchange exchange) throws Exception {
+ return new DelegateAsyncProcessor(target) {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
// we just want to count number of interceptions
counter.incrementAndGet();
-
// and continue processing the exchange
- target.process(exchange);
+ return super.process(exchange, callback);
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
index f3a943c..09a3745 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
@@ -38,8 +38,7 @@ public class AsyncEndpointRecipientListParallel5Test extends ContextTestSupport
assertMockEndpointsSatisfied();
- // to hard to do parallel async routing so the caller thread is synchronized
- assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertNotEquals("Should use different threads", beforeThreadName, afterThreadName);
}
@Override
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
index d521e52..3c057d4 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
@@ -36,8 +36,7 @@ public class AsyncEndpointRecipientListParallelTest extends ContextTestSupport {
String reply = template.requestBody("direct:start", "Hello Camel", String.class);
assertEquals("Bye Camel", reply);
- // to hard to do parallel async routing so the caller thread is synchronized
- assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertNotEquals("Should use different threads", beforeThreadName, afterThreadName);
}
@Override
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
index e17da41..84c48f5 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
@@ -71,6 +71,7 @@ public class RoutingSlipEventNotifierTest extends ContextTestSupport {
log.info("Sending: {}", event);
sending++;
} else {
+ log.info("Sent: {}", event);
sent++;
}
}
diff --git a/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java b/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java
new file mode 100644
index 0000000..2e6437d
--- /dev/null
+++ b/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+
+/**
+ * A filtering iterator
+ */
+public class FilterIterator<T> implements Iterator<T>, Closeable {
+
+ private Iterator<T> it;
+ private Predicate<T> filter;
+ private T next;
+ private boolean closed;
+
+ public FilterIterator(Iterator<T> it) {
+ this(it, null);
+ }
+
+ public FilterIterator(Iterator<T> it, Predicate<T> filter) {
+ this.it = it;
+ this.filter = filter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ IOHelper.closeIterator(it);
+ } finally {
+ // we are now closed
+ closed = true;
+ next = null;
+ }
+ }
+
+ public boolean hasNext() {
+ if (next == null) {
+ next = checkNext();
+ }
+ return next != null;
+ }
+
+ public T next() {
+ if (next == null) {
+ next = checkNext();
+ }
+ if (next != null) {
+ T ep = next;
+ next = null;
+ return ep;
+ }
+ throw new NoSuchElementException();
+ }
+
+ public void remove() {
+ it.remove();
+ }
+
+ protected T checkNext() {
+ while (!closed && it.hasNext()) {
+ T ep = it.next();
+ if (ep != null && (filter == null || filter.test(ep))) {
+ return ep;
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/camel-util/src/main/java/org/apache/camel/util/IOHelper.java b/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
index 98a4c4f..3a5843e 100644
--- a/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
@@ -386,16 +386,14 @@ public final class IOHelper {
}
public static void closeIterator(Object it) throws IOException {
+ if (it instanceof Closeable) {
+ IOHelper.closeWithException((Closeable) it);
+ }
if (it instanceof java.util.Scanner) {
- // special for Scanner which implement the Closeable since JDK7
- java.util.Scanner scanner = (java.util.Scanner) it;
- scanner.close();
- IOException ioException = scanner.ioException();
+ IOException ioException = ((java.util.Scanner) it).ioException();
if (ioException != null) {
throw ioException;
}
- } else if (it instanceof Closeable) {
- IOHelper.closeWithException((Closeable) it);
}
}
diff --git a/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java b/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
new file mode 100644
index 0000000..f421160
--- /dev/null
+++ b/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+public class AsyncCompletionService<V> {
+
+ private final Executor executor;
+ private final boolean ordered;
+ private final PriorityQueue<Task> queue = new PriorityQueue<>();
+ private final AtomicLong nextId = new AtomicLong();
+ private final AtomicLong index = new AtomicLong();
+ private final ReentrantLock lock;
+ private final Condition available;
+
+ public AsyncCompletionService(Executor executor, boolean ordered) {
+ this(executor, ordered, null);
+ }
+
+ public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock) {
+ this.executor = executor;
+ this.ordered = ordered;
+ this.lock = lock != null ? lock : new ReentrantLock();
+ this.available = this.lock.newCondition();
+ }
+
+ public ReentrantLock getLock() {
+ return lock;
+ }
+
+ public void submit(Consumer<Consumer<V>> runner) {
+ Task f = new Task(nextId.getAndIncrement(), runner);
+ this.executor.execute(f);
+ }
+
+ public void skip() {
+ index.incrementAndGet();
+ }
+
+ public V pollUnordered() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ Task t = queue.poll();
+ return t != null ? t.result : null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public V poll() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ Task t = queue.peek();
+ if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+ queue.poll();
+ return t.result;
+ } else {
+ return null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public V poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ for (;;) {
+ Task t = queue.peek();
+ if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+ queue.poll();
+ return t.result;
+ }
+ if (nanos <= 0) {
+ return null;
+ } else {
+ nanos = available.awaitNanos(nanos);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public V take() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ for (;;) {
+ Task t = queue.peek();
+ if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+ queue.poll();
+ return t.result;
+ }
+ available.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void complete(Task task) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ queue.add(task);
+ available.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private class Task implements Runnable, Comparable<Task> {
+ private final long id;
+ private final Consumer<Consumer<V>> runner;
+ private V result;
+
+ Task(long id, Consumer<Consumer<V>> runner) {
+ this.id = id;
+ this.runner = runner;
+ }
+
+ @Override
+ public void run() {
+ runner.accept(this::setResult);
+ }
+
+ protected void setResult(V result) {
+ this.result = result;
+ complete(this);
+ }
+
+ public int compareTo(Task other) {
+ return Long.compare(this.id, other.id);
+ }
+
+ public String toString() {
+ return "SubmitOrderedFutureTask[" + this.id + "]";
+ }
+ }
+}
diff --git a/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java b/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
new file mode 100644
index 0000000..565b87a
--- /dev/null
+++ b/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AsyncCompletionServiceTest extends Assert {
+
+ private ExecutorService executor;
+ private AsyncCompletionService<Object> service;
+
+ @Before
+ public void setUp() throws Exception {
+ executor = Executors.newFixedThreadPool(5);
+ service = new AsyncCompletionService<>(executor, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testSubmitOrdered() throws Exception {
+
+ service.submit(result("A"));
+ service.submit(result("B"));
+
+ Object a = service.take();
+ Object b = service.take();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedFirstTaskIsSlow() throws Exception {
+
+ service.submit(result("A", 200));
+ service.submit(result("B"));
+
+ Thread.sleep(300);
+
+ Object a = service.take();
+ Object b = service.take();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedFirstTaskIsSlowUsingPollTimeout() throws Exception {
+
+ service.submit(result("A", 200));
+ service.submit(result("B"));
+
+ Object a = service.poll(5, TimeUnit.SECONDS);
+ Object b = service.poll(5, TimeUnit.SECONDS);
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedFirstTaskIsSlowUsingPoll() throws Exception {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.submit(result("A", latch, 5, TimeUnit.SECONDS));
+ service.submit(result("B"));
+
+ // poll should not get it the first time
+ Object a = service.poll();
+ assertNull(a);
+
+ // and neither the 2nd time
+ a = service.poll();
+ assertNull(a);
+
+ // okay complete task
+ latch.countDown();
+
+ // okay take them
+ a = service.take();
+ Object b = service.take();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedSecondTaskIsSlow() throws Exception {
+
+ service.submit(result("A"));
+ service.submit(result("B", 100));
+
+ Object a = service.take();
+ Object b = service.take();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedSecondTaskIsSlowUsingPollTimeout() throws Exception {
+
+ service.submit(result("A"));
+ service.submit(result("B", 100));
+
+ Object a = service.poll(5, TimeUnit.SECONDS);
+ Object b = service.poll(5, TimeUnit.SECONDS);
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ @Test
+ public void testSubmitOrderedLastTaskIsSlowUsingPoll() throws Exception {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ service.submit(result("A"));
+ service.submit(result("B", latch, 5 ,TimeUnit.SECONDS));
+
+ // take a
+ Object a = service.take();
+ assertNotNull(a);
+
+ // poll should not get it the first time
+ Object b = service.poll();
+ assertNull(b);
+
+ // and neither the 2nd time
+ b = service.poll();
+ assertNull(b);
+
+ // okay complete task
+ latch.countDown();
+
+ // okay take it
+ b = service.take();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ Consumer<Consumer<Object>> result(Object r) {
+ return result -> result.accept(r);
+ }
+
+ Consumer<Consumer<Object>> result(Object r, int delay) {
+ return result -> {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ result.accept(r);
+ };
+ }
+
+ Consumer<Consumer<Object>> result(Object r, CountDownLatch latch, int timeout, TimeUnit unit) {
+ return result -> {
+ try {
+ latch.await(timeout, unit);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ result.accept(r);
+ };
+ }
+
+}
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
index e2544ab..a14a46b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
@@ -35,7 +35,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<!-- let this route complete all its pending messages when asked to shutdown -->
<route id="foo" autoStartup="false" shutdownRunningTask="CompleteAllTasks">
- <from uri="file:target/pending?initialDelay=0&delay=10"/>
+ <from uri="file:target/pending?initialDelay=0&delay=10&synchronous=true"/>
<delay><constant>1000</constant></delay>
<process ref="myProcessor"/>
<to uri="mock:bar"/>
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
index d6989e1..65001a4 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
@@ -31,7 +31,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route startupOrder="1" shutdownRunningTask="CompleteCurrentTaskOnly">
- <from uri="file:target/pending?initialDelay=0&delay=10"/>
+ <from uri="file:target/pending?initialDelay=0&delay=10&synchronous=true"/>
<delay><constant>1000</constant></delay>
<to uri="seda:foo"/>
</route>