You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/17 09:41:31 UTC
svn commit: r955499 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Thu Jun 17 07:41:30 2010
New Revision: 955499
URL: http://svn.apache.org/viewvc?rev=955499&view=rev
Log:
CAMEL-2723: Added more tests. CAMEL-2825: Fixed continued not being logged by default by error handler.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java (contents, props changed)
- copied, changed from r955488, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorTest.java
- copied, changed from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Removed:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=955499&r1=955498&r2=955499&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Jun 17 07:41:30 2010
@@ -117,25 +117,9 @@ public abstract class RedeliveryErrorHan
// or the dead letter queue
Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
// deliver to the failure processor (either an on exception or dead letter queue
- deliverToFailureProcessor(target, exchange, data);
- // prepare the exchange for failure before returning
- prepareExchangeAfterFailure(exchange, data);
- // fire event if we had a failure processor to handle it
- if (target != null) {
- boolean deadLetterChannel = target == data.deadLetterProcessor && data.deadLetterProcessor != null;
- EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, target, deadLetterChannel);
- }
-
- boolean shouldContinue = shouldContinue(exchange, data);
- if (shouldContinue) {
- // okay we want to continue then prepare the exchange for that as well
- prepareExchangeForContinue(exchange, data);
- }
-
- // we are breaking out so invoke the callback
- callback.done(data.sync);
- // and then return
- return data.sync;
+ boolean sync = deliverToFailureProcessor(target, exchange, data, callback);
+ // we are breaking out
+ return sync;
}
if (shouldRedeliver && data.redeliveryCounter > 0) {
@@ -173,6 +157,7 @@ public abstract class RedeliveryErrorHan
if (!isDone(exchange)) {
// TODO: async process redelivery (eg duplicate the error handler logic)
// And have a timer task scheduled when redelivery should occur to avoid blocking thread
+ log.debug("Not done continuing error handling asynchronously: " + exchange);
} else {
callback.done(sync);
}
@@ -325,6 +310,7 @@ public abstract class RedeliveryErrorHan
+ " before its redelivered");
}
+ // run this synchronously as its just a Processor
try {
data.onRedeliveryProcessor.process(exchange);
} catch (Exception e) {
@@ -336,8 +322,9 @@ public abstract class RedeliveryErrorHan
/**
* All redelivery attempts failed so move the exchange to the dead letter queue
*/
- protected void deliverToFailureProcessor(final Processor processor, final Exchange exchange,
- final RedeliveryData data) {
+ protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange,
+ final RedeliveryData data, final AsyncCallback callback) {
+ boolean sync = true;
Exception caught = exchange.getException();
@@ -346,7 +333,8 @@ public abstract class RedeliveryErrorHan
exchange.setException(null);
boolean handled = false;
- if (data.handledPredicate != null && data.handledPredicate.matches(exchange)) {
+ // regard both handled or continued as being handled
+ if (shouldHandled(exchange, data) || shouldContinue(exchange, data)) {
// its handled then remove traces of redelivery attempted
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
@@ -357,10 +345,12 @@ public abstract class RedeliveryErrorHan
decrementRedeliveryCounter(exchange);
}
- // reset cached streams so they can be read again
- MessageHelper.resetStreamCache(exchange.getIn());
-
+ // is the a failure processor to process the Exchange
if (processor != null) {
+
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
// prepare original IN body if it should be moved instead of current body
if (data.useOriginalInMessage) {
if (log.isTraceEnabled()) {
@@ -374,14 +364,38 @@ public abstract class RedeliveryErrorHan
if (log.isTraceEnabled()) {
log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
}
+
+ // store the last to endpoint as the failure endpoint
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+
+ // the failure processor could also be asynchronous
+ AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor);
+ sync = afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Failure processor done: " + processor + " processing Exchange: " + exchange);
+ }
+ try {
+ prepareExchangeAfterFailure(exchange, data);
+ // fire event as we had a failure processor to handle it
+ boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
+ EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
+ } finally {
+ // if the fault was handled asynchronously, this should be reflected in the callback as well
+ data.sync &= sync;
+ callback.done(data.sync);
+ }
+ }
+ });
+ } else {
try {
- // store the last to endpoint as the failure endpoint
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
+ // no processor but we need to prepare after failure as well
+ prepareExchangeAfterFailure(exchange, data);
+ } finally {
+ // indicate we are done synchronously
+ data.sync = true;
+ callback.done(data.sync);
}
- log.trace("Failure processor done");
}
// create log message
@@ -393,6 +407,8 @@ public abstract class RedeliveryErrorHan
// log that we failed delivery as we are exhausted
logFailedDelivery(false, handled, false, exchange, msg, data, null);
+
+ return sync;
}
protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data) {
@@ -417,21 +433,26 @@ public abstract class RedeliveryErrorHan
return;
}
- Predicate handledPredicate = data.handledPredicate;
- if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+ if (shouldHandled(exchange, data)) {
+ if (log.isDebugEnabled()) {
+ log.debug("This exchange is handled so its marked as not failed: " + exchange);
+ }
+ exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
+ } else if (shouldContinue(exchange, data)) {
if (log.isDebugEnabled()) {
- log.debug("This exchange is not handled so its marked as failed: " + exchange);
+ log.debug("This exchange is continued: " + exchange);
+ }
+ // okay we want to continue then prepare the exchange for that as well
+ prepareExchangeForContinue(exchange, data);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("This exchange is not handled or continued so its marked as failed: " + exchange);
}
// exception not handled, put exception back in the exchange
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
// and put failure endpoint back as well
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- } else {
- if (log.isDebugEnabled()) {
- log.debug("This exchange is handled so its marked as not failed: " + exchange);
- }
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
}
}
@@ -527,6 +548,21 @@ public abstract class RedeliveryErrorHan
}
/**
+ * Determines whether or not to handle if we are exhausted.
+ *
+ * @param exchange the current exchange
+ * @param data the redelivery data
+ * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
+ */
+ private boolean shouldHandled(Exchange exchange, RedeliveryData data) {
+ if (data.handledPredicate != null) {
+ return data.handledPredicate.matches(exchange);
+ }
+ // do not handle by default
+ return false;
+ }
+
+ /**
* Increments the redelivery counter and adds the redelivered flag if the
* message has been redelivered
*/
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=955499&r1=955498&r2=955499&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Thu Jun 17 07:41:30 2010
@@ -98,16 +98,18 @@ public final class UnitOfWorkProcessor e
private void doneUow(DefaultUnitOfWork uow, Exchange exchange) {
// unit of work is done
try {
- exchange.getUnitOfWork().done(exchange);
+ if (exchange.getUnitOfWork() != null) {
+ exchange.getUnitOfWork().done(exchange);
+ }
} catch (Throwable e) {
LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
- + ". This exception will be ignored.");
+ + ". This exception will be ignored.", e);
}
try {
uow.stop();
} catch (Throwable e) {
LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
- + ". This exception will be ignored.");
+ + ". This exception will be ignored.", e);
}
exchange.setUnitOfWork(null);
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java (from r955488, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java&r1=955488&r2=955499&rev=955499&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java Thu Jun 17 07:41:30 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointFailedAfterOnExceptionTest extends ContextTestSupport {
+public class AsyncEndpointFailedAfterOnExceptionHandledTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionHandledTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java?rev=955499&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java Thu Jun 17 07:41:30 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailureProcessorContinueTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("MyFailureHandler");
+ getMockEndpoint("mock:result").expectedBodiesReceived("MyFailureHandler");
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ // the onException can be asynchronous as well so we have to test for that
+ onException(IllegalArgumentException.class).continued(true)
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:MyFailureHandler")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after");
+
+ from("direct:start")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result")
+ .transform(constant("Bye Camel"));
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorContinueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorTest.java (from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955480&r2=955499&rev=955499&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailureProcessorTest.java Thu Jun 17 07:41:30 2010
@@ -24,15 +24,15 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointFailureProcessorTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
- getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("MyFailureHandler");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
String reply = template.requestBody("direct:start", "Hello Camel", String.class);
assertEquals("Bye Camel", reply);
@@ -49,7 +49,8 @@ public class AsyncEndpointTest extends C
public void configure() throws Exception {
context.addComponent("async", new MyAsyncComponent());
- from("direct:start")
+ // the onException can be asynchronous as well so we have to test for that
+ onException(IllegalArgumentException.class).handled(true)
.to("mock:before")
.to("log:before")
.process(new Processor() {
@@ -57,7 +58,7 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .to("async:MyFailureHandler")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -65,9 +66,13 @@ public class AsyncEndpointTest extends C
})
.to("log:after")
.to("mock:after")
+ .transform(constant("Bye Camel"));
+
+ from("direct:start")
+ .throwException(new IllegalArgumentException("Damn"))
.to("mock:result");
}
};
}
-}
+}
\ No newline at end of file