You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <ci...@silverbullet.dk> on 2008/10/11 11:19:03 UTC

RE: svn commit: r703622 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/builder/

Hi Hadrian

This one must be a typo
+    public ExceptionType handled(boolean cond) {
+        ConstantLanguage constant = new ConstantLanguage();
+        return handled(constant.createPredicate("true"));
+    }

The Boolean cond parameter is not used!



Med venlig hilsen
 
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk
-----Original Message-----
From: hadrian@apache.org [mailto:hadrian@apache.org] 
Sent: 11. oktober 2008 03:50
To: camel-commits@activemq.apache.org
Subject: svn commit: r703622 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/builder/

Author: hadrian
Date: Fri Oct 10 18:49:53 2008
New Revision: 703622

URL: http://svn.apache.org/viewvc?rev=703622&view=rev
Log:
CAMEL-960

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Oct 10 18:49:53 2008
@@ -34,6 +34,7 @@
 
     String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount";
 
+    String EXCEPTION_HANDLED_PROPERTY = "CamelExceptionHandled";
     /**
      * Returns the {@link ExchangePattern} (MEP) of this exchange.
      *

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java Fri Oct 10 18:49:53 2008
@@ -27,9 +27,11 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.language.constant.ConstantLanguage;
 import org.apache.camel.processor.CatchProcessor;
 import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.spi.RouteContext;
@@ -46,7 +48,7 @@
 
     @XmlElement(name = "exception")
     private List<String> exceptions = new ArrayList<String>();
-    @XmlElement(name = "redeliveryPolicy", required = false)
+     @XmlElement(name = "redeliveryPolicy", required = false)
     private RedeliveryPolicyType redeliveryPolicy;
     @XmlElementRef
     private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
@@ -54,6 +56,8 @@
     private List<Class> exceptionClasses;
     @XmlTransient
     private Processor errorHandler;
+    @XmlTransient
+    private Predicate handledPolicy;
 
     public ExceptionType() {
     }
@@ -115,6 +119,16 @@
 
     // Fluent API
     //-------------------------------------------------------------------------
+    public ExceptionType handled(boolean cond) {
+        ConstantLanguage constant = new ConstantLanguage();
+        return handled(constant.createPredicate("true"));
+    }
+    
+    public ExceptionType handled(Predicate cond) {
+        setHandledPolicy(cond);
+        return this;
+    }
+    
     public ExceptionType backOffMultiplier(double backOffMultiplier) {
         getOrCreateRedeliveryPolicy().backOffMultiplier(backOffMultiplier);
         return this;
@@ -196,6 +210,14 @@
         this.redeliveryPolicy = redeliveryPolicy;
     }
 
+    public Predicate getHandledPolicy() {
+        return handledPolicy;
+    }
+
+    public void setHandledPolicy(Predicate handledPolicy) {
+        this.handledPolicy = handledPolicy;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
     protected RedeliveryPolicyType getOrCreateRedeliveryPolicy() {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Oct 10 18:49:53 2008
@@ -16,19 +16,14 @@
  */
 package org.apache.camel.processor;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
 import java.util.concurrent.RejectedExecutionException;
 
-import javax.xml.transform.Source;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.converter.stream.StreamCache;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ExceptionType;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
@@ -50,6 +45,15 @@
     public static final String REDELIVERED = "org.apache.camel.Redelivered";
     public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
 
+
+    private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
+    private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
+    private Processor output;
+    private Processor deadLetter;
+    private AsyncProcessor outputAsync;
+    private RedeliveryPolicy redeliveryPolicy;
+    private Logger logger;
+
     private class RedeliveryData {
         int redeliveryCounter;
         long redeliveryDelay;
@@ -58,16 +62,9 @@
         // default behaviour which can be overloaded on a per exception basis
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
         Processor failureProcessor = deadLetter;
+        Predicate handledPredicate = null;
     }
 
-    private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
-    private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
-    private Processor output;
-    private Processor deadLetter;
-    private AsyncProcessor outputAsync;
-    private RedeliveryPolicy redeliveryPolicy;
-    private Logger logger;
-
     public DeadLetterChannel(Processor output, Processor deadLetter) {
         this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
             ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
@@ -130,6 +127,7 @@
                 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
                 if (exceptionPolicy != null) {
                     data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
+                    data.handledPredicate = exceptionPolicy.getHandledPolicy();
                     Processor processor = exceptionPolicy.getErrorHandler();
                     if (processor != null) {
                         data.failureProcessor = processor;
@@ -140,7 +138,7 @@
             // should we redeliver or not?
             if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
                 // we did not success with the redelivery so now we let the failure processor handle it
-                setFailureHandled(exchange, true);
+                setFailureHandled(exchange);
                 // must decrement the redelivery counter as we didn't process the redelivery but is
                 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
                 decrementRedeliveryCounter(exchange);
@@ -148,12 +146,13 @@
                 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
                 boolean sync = afp.process(exchange, new AsyncCallback() {
                     public void done(boolean sync) {
-                        restoreExceptionOnExchange(exchange);
+                        restoreExceptionOnExchange(exchange, data.handledPredicate);
                         callback.done(data.sync);
                     }
                 });
 
-                restoreExceptionOnExchange(exchange);
+                // The line below shouldn't be needed, it is invoked by the AsyncCallback above
+                //restoreExceptionOnExchange(exchange, data.handledPredicate);
                 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor);
                 return sync;
             }
@@ -169,7 +168,6 @@
                 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
             }
 
-
             // process the exchange
             boolean sync = outputAsync.process(exchange, new AsyncCallback() {
                 public void done(boolean sync) {
@@ -203,19 +201,18 @@
         return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
     }
 
-    public static void setFailureHandled(Exchange exchange, boolean isHandled) {
-        if (isHandled) {
-            exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
-            exchange.setException(null);
-        } else {
-            exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
-            exchange.removeProperty(FAILURE_HANDLED_PROPERTY);
-        }
-
+    public static void setFailureHandled(Exchange exchange) {
+        exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
+        exchange.setException(null);
     }
 
-    public static void restoreExceptionOnExchange(Exchange exchange) {
-        exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+    protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
+        if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+            // exception not handled, put exception back in the exchange
+            exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
+        } else {
+            exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
+        }
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -300,7 +297,6 @@
         }
     }
 
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, deadLetter);
@@ -310,5 +306,4 @@
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(deadLetter, output);
     }
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Fri Oct 10 18:49:53 2008
@@ -62,11 +62,16 @@
         Exchange nextExchange = original;
         boolean first = true;
         while (true) {
-            if (nextExchange.isFailed()) {
+            boolean handledException = Boolean.TRUE.equals(
+                    nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
+            if (nextExchange.isFailed() || handledException) {
+                // The Exchange.EXCEPTION_HANDLED_PROPERTY property is only set if satisfactory handling was done 
+                //  by the error handler.  It's still an exception, the exchange still failed.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
                               + " exception: " + nextExchange.getException() + " fault: "
-                              + nextExchange.getFault(false));
+                              + nextExchange.getFault(false)
+                              + (handledException ? " handled by the error handler" : ""));
                 }
                 break;
             }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java?rev=703622&r1=703621&r2=703622&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java Fri Oct 10 18:49:53 2008
@@ -35,11 +35,14 @@
 public class ExceptionBuilderTest extends ContextTestSupport {
 
     private static final String MESSAGE_INFO = "messageInfo";
+    private static final String RESULT_QUEUE = "mock:result";
     private static final String ERROR_QUEUE = "mock:error";
     private static final String BUSINESS_ERROR_QUEUE = "mock:badBusiness";
     private static final String SECURITY_ERROR_QUEUE = "mock:securityError";
 
     public void testNPE() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm a NPE");
@@ -52,10 +55,12 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public void testIOException() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm somekind of IO exception");
@@ -68,10 +73,12 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public void testException() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm just exception");
@@ -84,10 +91,12 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public void testMyBusinessException() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(BUSINESS_ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm my business is not going to well");
@@ -100,11 +109,13 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public void testSecurityConfiguredWithTwoExceptions() throws Exception {
         // test that we also handles a configuration with 2 or more exceptions
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(SECURITY_ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some security error");
@@ -117,11 +128,13 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public void testSecurityConfiguredWithExceptionList() throws Exception {
         // test that we also handles a configuration with a list of exceptions
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
         MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some access error");
@@ -134,7 +147,7 @@
             // expected
         }
 
-        mock.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(result, mock);
     }
 
     public static class MyBaseBusinessException extends Exception {

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java?rev=703622&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java Fri Oct 10 18:49:53 2008
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.io.IOException;
+import java.net.ConnectException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to test exception configuration
+ */
+public class ExceptionBuilderWithHandledExceptionTest extends ContextTestSupport {
+
+    private static final String MESSAGE_INFO = "messageInfo";
+    private static final String RESULT_QUEUE = "mock:result";
+    private static final String ERROR_QUEUE = "mock:error";
+
+    public void testHandledException() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
+        MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with NullPointerException");
+
+        template.sendBody("direct:a", "Hello NPE");
+        MockEndpoint.assertIsSatisfied(result, mock);
+    }
+
+    public void testHandledExceptionWithExpression() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
+        MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException");
+
+        template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "bar");
+        MockEndpoint.assertIsSatisfied(result, mock);
+    }
+
+    public void testUnhandledException() throws Exception {
+        MockEndpoint result = getMockEndpoint(RESULT_QUEUE);
+        result.expectedMessageCount(0);
+        MockEndpoint mock = getMockEndpoint(ERROR_QUEUE);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException");
+        
+        try {
+            template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "something that does not match");
+            fail("Should have thrown a IOException");
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof IOException);
+            // expected, failure is not handled because predicate doesn't match
+        }
+
+        MockEndpoint.assertIsSatisfied(result, mock);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: exceptionBuilder1
+                onException(NullPointerException.class)
+                    .maximumRedeliveries(0)
+                    .handled(true)
+                    .setHeader(MESSAGE_INFO, constant("Handled exchange with NullPointerException"))
+                    .to(ERROR_QUEUE);
+
+                onException(IOException.class)
+                    .maximumRedeliveries(0)
+                    .handled(header("foo").isEqualTo("bar"))
+                    .setHeader(MESSAGE_INFO, constant("Handled exchange with IOException"))
+                    .to(ERROR_QUEUE);
+                // END SNIPPET: exceptionBuilder1
+
+                from("direct:a").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String s = exchange.getIn().getBody(String.class);
+                        if ("Hello NPE".equals(s)) {
+                            throw new NullPointerException();
+                        } else if ("Hello IOE".equals(s)) {
+                            // specialized IOException
+                            throw new ConnectException("Forced for testing - can not connect to remote server");
+                        }
+                        exchange.getOut().setBody("Hello World");
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+}