You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/17 14:24:21 UTC
svn commit: r955567 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/builder/
main/java/org/apache/camel/impl/converter/
main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Thu Jun 17 12:24:21 2010
New Revision: 955567
URL: http://svn.apache.org/viewvc?rev=955567&view=rev
Log:
CAMEL-2723: Error handlers now support async routing engine with task scheduled redelivery tasks.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java
- copied, changed from r955553, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Thu Jun 17 12:24:21 2010
@@ -53,9 +53,9 @@ public class DeadLetterChannelBuilder ex
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
validateDeadLetterUri(routeContext);
- DeadLetterChannel answer = new DeadLetterChannel(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
- getHandledPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(),
- isUseOriginalMessage());
+ DeadLetterChannel answer = new DeadLetterChannel(routeContext.getCamelContext(), processor, getLogger(),
+ getOnRedelivery(), getRedeliveryPolicy(), getHandledPolicy(), getExceptionPolicyStrategy(),
+ getFailureProcessor(), getDeadLetterUri(), isUseOriginalMessage());
// configure error handler before we can use it
configure(answer);
return answer;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Thu Jun 17 12:24:21 2010
@@ -52,8 +52,8 @@ public class DefaultErrorHandlerBuilder
}
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- DefaultErrorHandler answer = new DefaultErrorHandler(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
- getHandledPolicy(), getExceptionPolicyStrategy());
+ DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(),
+ getOnRedelivery(), getRedeliveryPolicy(), getHandledPolicy(), getExceptionPolicyStrategy());
// configure error handler before we can use it
configure(answer);
return answer;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java Thu Jun 17 12:24:21 2010
@@ -57,6 +57,15 @@ public class AsyncProcessorTypeConverter
}
return true;
}
+
+ @Override
+ public String toString() {
+ if (processor != null) {
+ return processor.toString();
+ } else {
+ return "Processor is null";
+ }
+ }
}
public <T> T convertTo(Class<T> type, Object value) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Jun 17 12:24:21 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
@@ -34,6 +35,7 @@ public class DeadLetterChannel extends R
/**
* Creates the dead letter channel.
*
+ * @param camelContext the camel context
* @param output outer processor that should use this dead letter channel
* @param logger logger to use for logging failures and redelivery attempts
* @param redeliveryProcessor an optional processor to run before redelivery attempt
@@ -44,10 +46,10 @@ public class DeadLetterChannel extends R
* @param deadLetterUri an optional uri for logging purpose
* @param useOriginalBodyPolicy should the original IN body be moved to the dead letter queue or the current exchange IN body?
*/
- public DeadLetterChannel(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+ public DeadLetterChannel(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy,
Processor deadLetter, String deadLetterUri, boolean useOriginalBodyPolicy) {
- super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy);
+ super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy);
setExceptionPolicy(exceptionPolicyStrategy);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Thu Jun 17 12:24:21 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
@@ -31,6 +32,7 @@ public class DefaultErrorHandler extends
/**
* Creates the default error handler.
*
+ * @param camelContext the camel context
* @param output outer processor that should use this default error handler
* @param logger logger to use for logging failures and redelivery attempts
* @param redeliveryProcessor an optional processor to run before redelivery attempt
@@ -38,9 +40,9 @@ public class DefaultErrorHandler extends
* @param handledPolicy policy for handling failed exception that are moved to the dead letter queue
* @param exceptionPolicyStrategy strategy for onException handling
*/
- public DefaultErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
- Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) {
- super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false);
+ public DefaultErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
+ RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) {
+ super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false);
setExceptionPolicy(exceptionPolicyStrategy);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Jun 17 12:24:21 2010
@@ -16,10 +16,14 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
@@ -44,6 +48,8 @@ import org.apache.camel.util.ServiceHelp
*/
public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
+ private static ScheduledExecutorService executorService;
+ protected final CamelContext camelContext;
protected final Processor deadLetter;
protected final String deadLetterUri;
protected final Processor output;
@@ -54,6 +60,9 @@ public abstract class RedeliveryErrorHan
protected final Logger logger;
protected final boolean useOriginalMessagePolicy;
+ /**
+ * Contains the current redelivery data
+ */
protected class RedeliveryData {
boolean sync = true;
int redeliveryCounter;
@@ -70,9 +79,67 @@ public abstract class RedeliveryErrorHan
boolean useOriginalInMessage = useOriginalMessagePolicy;
}
- public RedeliveryErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor,
+ /**
+ * Tasks which performs asynchronous redelivery attempts, and being triggered by a
+ * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
+ * has to be delayed before a redelivery attempt is performed.
+ */
+ private class RedeliveryTask implements Callable<Boolean> {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final RedeliveryData data;
+
+ public RedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.data = data;
+ }
+
+ public Boolean call() throws Exception {
+ // prepare for redelivery
+ prepareExchangeForRedelivery(exchange);
+
+ // letting onRedeliver be executed at first
+ deliverToRedeliveryProcessor(exchange, data);
+
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " -> " + outputAsync + " for Exchange: " + exchange);
+ }
+
+ // process the exchange (also redelivery)
+ boolean sync = outputAsync.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done");
+ }
+
+ // this callback should only handle the async case
+ if (sync) {
+ return;
+ }
+
+ // mark we are in async mode now
+ data.sync = false;
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (isDone(exchange)) {
+ callback.done(sync);
+ return;
+ }
+ // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ processAsyncErrorHandler(exchange, callback, data);
+ }
+ });
+
+ return sync;
+ }
+ }
+
+ public RedeliveryErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
String deadLetterUri, boolean useOriginalMessagePolicy) {
+ this.camelContext = camelContext;
this.redeliveryProcessor = redeliveryProcessor;
this.deadLetter = deadLetter;
this.output = output;
@@ -101,9 +168,11 @@ public abstract class RedeliveryErrorHan
}
/**
- * Processes the exchange decorated with this dead letter channel.
+ * Process the exchange using redelivery error handling.
*/
protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+
+ // use looping to have redelivery attempts
while (true) {
// can we still run
@@ -133,7 +202,7 @@ public abstract class RedeliveryErrorHan
return sync;
}
- if (shouldRedeliver && data.redeliveryCounter > 0) {
+ if (data.redeliveryCounter > 0) {
// prepare for redelivery
prepareExchangeForRedelivery(exchange);
@@ -163,32 +232,43 @@ public abstract class RedeliveryErrorHan
// mark we are in async mode now
data.sync = false;
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
+
+ // if we are done then notify callback and exit
if (isDone(exchange)) {
callback.done(sync);
return;
}
+
// error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ // method which takes care of this in a asynchronous manner
processAsyncErrorHandler(exchange, callback, data);
}
});
+
if (!sync) {
// the remainder of the Exchange is being processed asynchronously so we should return
return false;
}
+ // we continue to route synchronously
- // we route synchronously
+ // if we are done then notify callback and exit
boolean done = isDone(exchange);
if (done) {
callback.done(true);
return true;
}
+
// error occurred so loop back around.....
}
-
}
+ /**
+ * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
+ * <p/>
+ * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
+ * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
+ * in terms of logic.
+ */
protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
// can we still run
if (!isRunAllowed()) {
@@ -217,48 +297,24 @@ public abstract class RedeliveryErrorHan
return;
}
- // TODO: Use a scheduler to schedule the redelivery delay
- // which contains the outputAsync task being executed
- // we can optimize and only use the scheduler if there is a delay
- if (shouldRedeliver && data.redeliveryCounter > 0) {
- // prepare for redelivery
- prepareExchangeForRedelivery(exchange);
-
- // if we are redelivering then sleep before trying again
- // wait until we should redeliver
- try {
- data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+ if (data.redeliveryCounter > 0) {
+ // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
+ // have it being executed in the future, or immediately
+ RedeliveryTask task = new RedeliveryTask(exchange, callback, data);
+
+ // calculate the redelivery delay
+ data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
+ if (data.redeliveryDelay > 0) {
+ // schedule the redelivery task
+ if (log.isTraceEnabled()) {
+ log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
}
- return;
+ executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
+ } else {
+ // execute the task immediately
+ executorService.submit(task);
}
-
- // letting onRedeliver be executed
- deliverToRedeliveryProcessor(exchange, data);
}
-
- // process the exchange (also redelivery)
- outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // this callback should only handle the async case
- if (sync) {
- return;
- }
-
- // mark we are in async mode now
- data.sync = false;
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (isDone(exchange)) {
- callback.done(sync);
- return;
- }
- // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- processAsyncErrorHandler(exchange, callback, data);
- }
- });
}
/**
@@ -277,9 +333,14 @@ public abstract class RedeliveryErrorHan
// only done if the exchange hasn't failed
// and it has not been handled by the failure processor
// or we are exhausted
- return exchange.getException() == null
- || ExchangeHelper.isFailureHandled(exchange)
- || ExchangeHelper.isRedeliveryExhausted(exchange);
+ boolean answer = exchange.getException() == null
+ || ExchangeHelper.isFailureHandled(exchange)
+ || ExchangeHelper.isRedeliveryExhausted(exchange);
+
+ if (log.isTraceEnabled()) {
+ log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " + answer);
+ }
+ return answer;
}
/**
@@ -460,7 +521,7 @@ public abstract class RedeliveryErrorHan
}
try {
prepareExchangeAfterFailure(exchange, data);
- // fire event as we had a failure processor to handle it
+ // fire event as we had a failure processor to handle it, which there is a event for
boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
} finally {
@@ -501,8 +562,8 @@ public abstract class RedeliveryErrorHan
boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
if (alreadySet) {
boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
- if (log.isDebugEnabled()) {
- log.debug("This exchange has already been marked for handling: " + handled);
+ if (log.isTraceEnabled()) {
+ log.trace("This exchange has already been marked for handling: " + handled);
}
if (handled) {
exchange.setException(null);
@@ -516,19 +577,19 @@ public abstract class RedeliveryErrorHan
}
if (shouldHandled(exchange, data)) {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is handled so its marked as not failed: " + exchange);
+ if (log.isTraceEnabled()) {
+ log.trace("This exchange is handled so its marked as not failed: " + exchange);
}
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
} else if (shouldContinue(exchange, data)) {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is continued: " + exchange);
+ if (log.isTraceEnabled()) {
+ log.trace("This exchange is continued: " + exchange);
}
// okay we want to continue then prepare the exchange for that as well
prepareExchangeForContinue(exchange, data);
} else {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is not handled or continued so its marked as failed: " + exchange);
+ if (log.isTraceEnabled()) {
+ log.trace("This exchange is not handled or continued so its marked as failed: " + exchange);
}
// exception not handled, put exception back in the exchange
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
@@ -681,6 +742,14 @@ public abstract class RedeliveryErrorHan
@Override
protected void doStart() throws Exception {
ServiceHelper.startServices(output, outputAsync, deadLetter);
+ // use pool size from default profile
+ int poolSize = camelContext.getExecutorServiceStrategy().getDefaultThreadPoolProfile().getPoolSize();
+ // use a shared scheduler
+ if (executorService == null || executorService.isShutdown()) {
+ // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
+ executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this,
+ getClass().getSimpleName() + "-AsyncRedeliveryTask", poolSize);
+ }
}
@Override
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java (from r955553, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java&r1=955553&r2=955567&rev=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java Thu Jun 17 12:24:21 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncFailureProcessorWithRedeliveryTest extends ContextTestSupport {
+public class AsyncFailureProcessorWithRedeliveryAndDelayTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -50,7 +50,7 @@ public class AsyncFailureProcessorWithRe
context.addComponent("async", new MyAsyncComponent());
// use redelivery up till 5 times
- errorHandler(defaultErrorHandler().maximumRedeliveries(5));
+ errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(1000));
from("direct:start")
.to("mock:before")