You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <ci...@silverbullet.dk> on 2008/10/11 11:19:03 UTC
RE: svn commit: r703622 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/builder/
Hi Hadrian
This one must be a typo
+ public ExceptionType handled(boolean cond) {
+ ConstantLanguage constant = new ConstantLanguage();
+ return handled(constant.createPredicate("true"));
+ }
The Boolean cond parameter is not used!
Med venlig hilsen
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk
-----Original Message-----
From: hadrian@apache.org [mailto:hadrian@apache.org]
Sent: 11. oktober 2008 03:50
To: camel-commits@activemq.apache.org
Subject: svn commit: r703622 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/builder/
Author: hadrian
Date: Fri Oct 10 18:49:53 2008
New Revision: 703622
URL: http://svn.apache.org/viewvc?rev=703622&view=rev
Log:
CAMEL-960
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Oct 10 18:49:53 2008
@@ -34,6 +34,7 @@
String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount";
+ String EXCEPTION_HANDLED_PROPERTY = "CamelExceptionHandled";
/**
* Returns the {@link ExchangePattern} (MEP) of this exchange.
*
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java Fri Oct 10 18:49:53 2008
@@ -27,9 +27,11 @@
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.language.constant.ConstantLanguage;
import org.apache.camel.processor.CatchProcessor;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spi.RouteContext;
@@ -46,7 +48,7 @@
@XmlElement(name = "exception")
private List<String> exceptions = new ArrayList<String>();
- @XmlElement(name = "redeliveryPolicy", required = false)
+ @XmlElement(name = "redeliveryPolicy", required = false)
private RedeliveryPolicyType redeliveryPolicy;
@XmlElementRef
private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
@@ -54,6 +56,8 @@
private List<Class> exceptionClasses;
@XmlTransient
private Processor errorHandler;
+ @XmlTransient
+ private Predicate handledPolicy;
public ExceptionType() {
}
@@ -115,6 +119,16 @@
// Fluent API
//-------------------------------------------------------------------------
+ public ExceptionType handled(boolean cond) {
+ ConstantLanguage constant = new ConstantLanguage();
+ return handled(constant.createPredicate("true"));
+ }
+
+ public ExceptionType handled(Predicate cond) {
+ setHandledPolicy(cond);
+ return this;
+ }
+
public ExceptionType backOffMultiplier(double backOffMultiplier) {
getOrCreateRedeliveryPolicy().backOffMultiplier(backOffMultiplier);
return this;
@@ -196,6 +210,14 @@
this.redeliveryPolicy = redeliveryPolicy;
}
+ public Predicate getHandledPolicy() {
+ return handledPolicy;
+ }
+
+ public void setHandledPolicy(Predicate handledPolicy) {
+ this.handledPolicy = handledPolicy;
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
protected RedeliveryPolicyType getOrCreateRedeliveryPolicy() {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Oct 10 18:49:53 2008
@@ -16,19 +16,14 @@
*/
package org.apache.camel.processor;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
import java.util.concurrent.RejectedExecutionException;
-import javax.xml.transform.Source;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.converter.stream.StreamCache;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.ExceptionType;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
@@ -50,6 +45,15 @@
public static final String REDELIVERED = "org.apache.camel.Redelivered";
public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
+
+ private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
+ private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
+ private Processor output;
+ private Processor deadLetter;
+ private AsyncProcessor outputAsync;
+ private RedeliveryPolicy redeliveryPolicy;
+ private Logger logger;
+
private class RedeliveryData {
int redeliveryCounter;
long redeliveryDelay;
@@ -58,16 +62,9 @@
// default behaviour which can be overloaded on a per exception basis
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
Processor failureProcessor = deadLetter;
+ Predicate handledPredicate = null;
}
- private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
- private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
- private Processor output;
- private Processor deadLetter;
- private AsyncProcessor outputAsync;
- private RedeliveryPolicy redeliveryPolicy;
- private Logger logger;
-
public DeadLetterChannel(Processor output, Processor deadLetter) {
this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
@@ -130,6 +127,7 @@
ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
if (exceptionPolicy != null) {
data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
+ data.handledPredicate = exceptionPolicy.getHandledPolicy();
Processor processor = exceptionPolicy.getErrorHandler();
if (processor != null) {
data.failureProcessor = processor;
@@ -140,7 +138,7 @@
// should we redeliver or not?
if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
// we did not success with the redelivery so now we let the failure processor handle it
- setFailureHandled(exchange, true);
+ setFailureHandled(exchange);
// must decrement the redelivery counter as we didn't process the redelivery but is
// handling by the failure handler. So we must -1 to not let the counter be out-of-sync
decrementRedeliveryCounter(exchange);
@@ -148,12 +146,13 @@
AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
boolean sync = afp.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
- restoreExceptionOnExchange(exchange);
+ restoreExceptionOnExchange(exchange, data.handledPredicate);
callback.done(data.sync);
}
});
- restoreExceptionOnExchange(exchange);
+ // The line below shouldn't be needed, it is invoked by the AsyncCallback above
+ //restoreExceptionOnExchange(exchange, data.handledPredicate);
logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor);
return sync;
}
@@ -169,7 +168,6 @@
data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
}
-
// process the exchange
boolean sync = outputAsync.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
@@ -203,19 +201,18 @@
return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
}
- public static void setFailureHandled(Exchange exchange, boolean isHandled) {
- if (isHandled) {
- exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
- exchange.setException(null);
- } else {
- exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
- exchange.removeProperty(FAILURE_HANDLED_PROPERTY);
- }
-
+ public static void setFailureHandled(Exchange exchange) {
+ exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
+ exchange.setException(null);
}
- public static void restoreExceptionOnExchange(Exchange exchange) {
- exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+ protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
+ if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+ // exception not handled, put exception back in the exchange
+ exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+ } else {
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
+ }
}
public void process(Exchange exchange) throws Exception {
@@ -300,7 +297,6 @@
}
}
-
@Override
protected void doStart() throws Exception {
ServiceHelper.startServices(output, deadLetter);
@@ -310,5 +306,4 @@
protected void doStop() throws Exception {
ServiceHelper.stopServices(deadLetter, output);
}
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Fri Oct 10 18:49:53 2008
@@ -62,11 +62,16 @@
Exchange nextExchange = original;
boolean first = true;
while (true) {
- if (nextExchange.isFailed()) {
+ boolean handledException = Boolean.TRUE.equals(
+ nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
+ if (nextExchange.isFailed() || handledException) {
+ // The Exchange.EXCEPTION_HANDLED_PROPERTY property is only set if satisfactory handling was done
+ // by the error handler. It's still an exception, the exchange still failed.
if (LOG.isDebugEnabled()) {
LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
+ " exception: " + nextExchange.getException() + " fault: "
- + nextExchange.getFault(false));
+ + nextExchange.getFault(false)
+ + (handledException ? " handled by the error handler" : ""));
}
break;
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java Fri Oct 10 18:49:53 2008
@@ -35,11 +35,14 @@
public class ExceptionBuilderTest extends ContextTestSupport {
private static final String MESSAGE_INFO = "messageInfo";
+ private static final String RESULT_QUEUE = "mock:result";
private static final String ERROR_QUEUE = "mock:error";
private static final String BUSINESS_ERROR_QUEUE = "mock:badBusiness";
private static final String SECURITY_ERROR_QUEUE = "mock:securityError";
public void testNPE() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm a NPE");
@@ -52,10 +55,12 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public void testIOException() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm somekind of IO exception");
@@ -68,10 +73,12 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public void testException() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm just exception");
@@ -84,10 +91,12 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public void testMyBusinessException() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(BUSINESS_ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm my business is not going to well");
@@ -100,11 +109,13 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public void testSecurityConfiguredWithTwoExceptions() throws Exception {
// test that we also handles a configuration with 2 or more exceptions
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(SECURITY_ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some security error");
@@ -117,11 +128,13 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public void testSecurityConfiguredWithExceptionList() throws Exception {
// test that we also handles a configuration with a list of exceptions
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some access error");
@@ -134,7 +147,7 @@
// expected
}
- mock.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(result, mock);
}
public static class MyBaseBusinessException extends Exception {
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java?rev=703622&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java Fri Oct 10 18:49:53 2008
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.io.IOException;
+import java.net.ConnectException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to test exception configuration
+ */
+public class ExceptionBuilderWithHandledExceptionTest extends ContextTestSupport {
+
+ private static final String MESSAGE_INFO = "messageInfo";
+ private static final String RESULT_QUEUE = "mock:result";
+ private static final String ERROR_QUEUE = "mock:error";
+
+ public void testHandledException() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
+ MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with NullPointerException");
+
+ template.sendBody("direct:a", "Hello NPE");
+ MockEndpoint.assertIsSatisfied(result, mock);
+ }
+
+ public void testHandledExceptionWithExpression() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
+ MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException");
+
+ template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "bar");
+ MockEndpoint.assertIsSatisfied(result, mock);
+ }
+
+ public void testUnhandledException() throws Exception {
+ MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+ result.expectedMessageCount(0);
+ MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException");
+
+ try {
+ template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "something that does not match");
+ fail("Should have thrown a IOException");
+ } catch (RuntimeCamelException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ // expected, failure is not handled because predicate doesn't match
+ }
+
+ MockEndpoint.assertIsSatisfied(result, mock);
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: exceptionBuilder1
+ onException(NullPointerException.class)
+ .maximumRedeliveries(0)
+ .handled(true)
+ .setHeader(MESSAGE_INFO, constant("Handled exchange with NullPointerException"))
+ .to(ERROR_QUEUE);
+
+ onException(IOException.class)
+ .maximumRedeliveries(0)
+ .handled(header("foo").isEqualTo("bar"))
+ .setHeader(MESSAGE_INFO, constant("Handled exchange with IOException"))
+ .to(ERROR_QUEUE);
+ // END SNIPPET: exceptionBuilder1
+
+ from("direct:a").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String s = exchange.getIn().getBody(String.class);
+ if ("Hello NPE".equals(s)) {
+ throw new NullPointerException();
+ } else if ("Hello IOE".equals(s)) {
+ // specialized IOException
+ throw new ConnectException("Forced for testing - can not connect to remote server");
+ }
+ exchange.getOut().setBody("Hello World");
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+}