You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/01/08 20:01:23 UTC
svn commit: r732793 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/interceptor/
Author: davsclaus
Date: Thu Jan 8 11:01:22 2009
New Revision: 732793
URL: http://svn.apache.org/viewvc?rev=732793&view=rev
Log:
CAMEL-1234: Added onRedelivery option to DeadLetterChannel to allow processing a custom processor before every redelivery attempt.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (contents, props changed)
- copied, changed from r732654, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Thu Jan 8 11:01:22 2009
@@ -38,6 +38,7 @@
*/
public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private Processor onRedelivery;
private ExceptionPolicyStrategy exceptionPolicyStrategy = ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
private ProcessorFactory deadLetterFactory;
private Processor defaultDeadLetterEndpoint;
@@ -63,8 +64,8 @@
}
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
- Processor deadLetter = getDeadLetterFactory().createProcessor();
- DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+ Processor deadLetter = getDeadLetterFactory().createProcessor();
+ DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
configure(answer);
return answer;
}
@@ -162,6 +163,16 @@
return this;
}
+ /**
+ * Sets a processor that should be processed <b>before</b> a redelivey attempt.
+ * <p/>
+ * Can be used to change the {@link org.apache.camel.Exchange} <b>before</b> its being redelivered.
+ */
+ public DeadLetterChannelBuilder onRedelivery(Processor processor) {
+ setOnRedelivery(processor);
+ return this;
+ }
+
// Properties
// -------------------------------------------------------------------------
public RedeliveryPolicy getRedeliveryPolicy() {
@@ -260,6 +271,14 @@
this.exceptionPolicyStrategy = exceptionPolicyStrategy;
}
+ public Processor getOnRedelivery() {
+ return onRedelivery;
+ }
+
+ public void setOnRedelivery(Processor onRedelivery) {
+ this.onRedelivery = onRedelivery;
+ }
+
@Override
public String toString() {
return "DeadLetterChannelBuilder(" + (deadLetterFactory != null ? deadLetterFactory : defaultDeadLetterEndpoint) + ")";
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Jan 8 11:01:22 2009
@@ -58,6 +58,7 @@
private AsyncProcessor outputAsync;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
+ private Processor redeliveryProcessor;
private class RedeliveryData {
int redeliveryCounter;
@@ -105,22 +106,17 @@
}
}
- public DeadLetterChannel(Processor output, Processor deadLetter) {
- this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
- ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
- }
-
- public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
- this.deadLetter = deadLetter;
+ public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
this.output = output;
+ this.deadLetter = deadLetter;
+ this.redeliveryProcessor = redeliveryProcessor;
this.outputAsync = AsyncProcessorTypeConverter.convert(output);
-
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
setExceptionPolicy(exceptionPolicyStrategy);
}
- public static <E extends Exchange> Logger createDefaultLogger() {
+ public static Logger createDefaultLogger() {
return new Logger(LOG, LoggingLevel.ERROR);
}
@@ -170,7 +166,6 @@
boolean shouldRedeliver = shouldRedeliver(exchange, data);
if (!shouldRedeliver) {
return deliverToFaultProcessor(exchange, callback, data);
-
}
// if we are redelivering then sleep before trying again
@@ -182,6 +177,9 @@
// wait until we should redeliver
data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
}
// process the exchange
@@ -257,7 +255,10 @@
// wait until we should redeliver
data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
- }
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
+ }
}
private void handleException(Exchange exchange, RedeliveryData data) {
@@ -274,13 +275,37 @@
Processor processor = exceptionPolicy.getErrorHandler();
if (processor != null) {
data.failureProcessor = processor;
- }
+ }
}
-
- logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
- + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+
+ String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
+ logFailedDelivery(true, exchange, msg, data, e);
+
data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-
+ }
+
+ /**
+ * Gives an optional configure redelivery processor a chance to process before the Exchange
+ * will be redelivered. This can be used to alter the Exchange.
+ */
+ private boolean deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
+ final RedeliveryData data) {
+ if (redeliveryProcessor == null) {
+ return true;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange before its redelivered");
+ }
+ AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor);
+ boolean sync = afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ callback.done(data.sync);
+ }
+ });
+
+ return sync;
}
private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
@@ -299,11 +324,10 @@
}
});
- // The line below shouldn't be needed, it is invoked by the AsyncCallback above
- // restoreExceptionOnExchange(exchange, data.handledPredicate);
- logFailedDelivery(false, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
- + ". Handled by the failure processor: " + data.failureProcessor,
- data, null);
+ String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ + ". Handled by the failure processor: " + data.failureProcessor;
+ logFailedDelivery(false, exchange, msg, data, null);
+
return sync;
}
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (from r732654, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java&r1=732654&r2=732793&rev=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Thu Jan 8 11:01:22 2009
@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor.interceptor;
+package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
/**
* Unit test for testing possibility to modify exchange before redelivering
*/
-public class InterceptAlterMessageBeforeRedeliveryTest extends ContextTestSupport {
+public class DeadLetterChannelOnRedeliveryTest extends ContextTestSupport {
static int counter;
- public void testInterceptAlterMessageBeforeRedelivery() throws Exception {
+ public void testOnExceptionAlterMessageBeforeRedelivery() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World123");
@@ -39,7 +39,7 @@
assertMockEndpointsSatisfied();
}
- public void testInterceptAlterMessageWithHeadersBeforeRedelivery() throws Exception {
+ public void testOnExceptionAlterMessageWithHeadersBeforeRedelivery() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World123");
mock.expectedHeaderReceived("foo", "123");
@@ -60,25 +60,14 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // to execute unit test much faster we dont use delay between redeliveries
- errorHandler(deadLetterChannel("mock:error").delay(0L));
-
// START SNIPPET: e1
- // we configure an interceptor that is triggered when the redelivery flag
- // has been set TRUE on an exchange
- intercept().when(header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE)).
- process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- // the message is being redelivered so we can alter it
-
- // we just append the redelivery counter to the body
- // you can of course do all kind of stuff instead
- String body = exchange.getIn().getBody(String.class);
- int count = exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
-
- exchange.getIn().setBody(body + count);
- }
- });
+ // we configure our Dead Letter Channel to invoke
+ // MyRedeliveryProcessor before a redelivery is
+ // attempted. This allows us to alter the message before
+ errorHandler(deadLetterChannel("mock:error")
+ .onRedelivery(new MyRedeliverPrcessor())
+ // setting delay to zero is just to make unit teting faster
+ .delay(0L));
// END SNIPPET: e1
@@ -95,5 +84,23 @@
};
}
+ // START SNIPPET: e2
+ // This is our processor that is executed before every redelivery attempt
+ // here we can do what we want in the java code, such as altering the message
+ public class MyRedeliverPrcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // the message is being redelivered so we can alter it
+
+ // we just append the redelivery counter to the body
+ // you can of course do all kind of stuff instead
+ String body = exchange.getIn().getBody(String.class);
+ int count = exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
+
+ exchange.getIn().setBody(body + count);
+ }
+ }
+ // END SNIPPET: e2
+
}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java Thu Jan 8 11:01:22 2009
@@ -19,9 +19,9 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
/**
* Unit test for testing possibility to modify exchange before redelivering