You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/07 09:20:57 UTC

[1/2] camel git commit: CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also

Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 66b436fb7 -> 3660437ae
  refs/heads/master 59c8a9e6a -> 7944093fd


CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7944093f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7944093f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7944093f

Branch: refs/heads/master
Commit: 7944093fdcbfca12cc14afba93a674e81a882fc8
Parents: 59c8a9e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 7 09:19:50 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 7 09:19:56 2016 +0100

----------------------------------------------------------------------
 .../camel/processor/FinallyProcessor.java       | 90 +++++++++++++++-----
 .../camel/processor/TrySetFaultFinallyTest.java | 52 +++++++++++
 2 files changed, 119 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7944093f/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index b04e172..4fe21a6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -41,37 +41,30 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // clear exception so finally block can be executed
-        final Exception e = exchange.getException();
+        // clear exception and fault so finally block can be executed
+        final boolean fault;
+        if (exchange.hasOut()) {
+            fault = exchange.getOut().isFault();
+            exchange.getOut().setFault(false);
+        } else {
+            fault = exchange.getIn().isFault();
+            exchange.getIn().setFault(false);
+        }
+
+        final Exception exception = exchange.getException();
         exchange.setException(null);
         // but store the caught exception as a property
-        if (e != null) {
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+        if (exception != null) {
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
         }
+
         // store the last to endpoint as the failure endpoint
         if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
             exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
         }
 
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (e == null) {
-                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
-                } else {
-                    // set exception back on exchange
-                    exchange.setException(e);
-                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-                }
-
-                if (!doneSync) {
-                    // signal callback to continue routing async
-                    ExchangeHelper.prepareOutToIn(exchange);
-                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                }
-                callback.done(doneSync);
-            }
-        });
-        return sync;
+        // continue processing
+        return processor.process(exchange, new FinallyAsyncCallback(exchange, callback, exception, fault));
     }
 
     @Override
@@ -90,4 +83,55 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
     public void setId(String id) {
         this.id = id;
     }
+
+    private static final class FinallyAsyncCallback implements AsyncCallback {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final Exception exception;
+        private final boolean fault;
+
+        public FinallyAsyncCallback(Exchange exchange, AsyncCallback callback, Exception exception, boolean fault) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.exception = exception;
+            this.fault = fault;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            try {
+                if (exception == null) {
+                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
+                } else {
+                    // set exception back on exchange
+                    exchange.setException(exception);
+                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+                }
+                // set fault flag back
+                if (fault) {
+                    if (exchange.hasOut()) {
+                        exchange.getOut().setFault(true);
+                    } else {
+                        exchange.getIn().setFault(true);
+                    }
+                }
+
+                if (!doneSync) {
+                    // signal callback to continue routing async
+                    ExchangeHelper.prepareOutToIn(exchange);
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
+            } finally {
+                // callback must always be called
+                callback.done(doneSync);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "FinallyAsyncCallback";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7944093f/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
new file mode 100644
index 0000000..4413f9b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class TrySetFaultFinallyTest extends ContextTestSupport {
+
+    public void testSetFaultFinally() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.requestBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .doTry()
+                        .to("mock:a")
+                        .setFaultBody(constant("Failed at A"))
+                    .doFinally()
+                        .to("mock:b")
+                        .to("mock:c")
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+}


[2/2] camel git commit: CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also

Posted by da...@apache.org.
CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3660437a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3660437a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3660437a

Branch: refs/heads/camel-2.16.x
Commit: 3660437ae50128665fa2dea0b76d93dff242c0f8
Parents: 66b436f
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 7 09:19:50 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 7 09:20:16 2016 +0100

----------------------------------------------------------------------
 .../camel/processor/FinallyProcessor.java       | 90 +++++++++++++++-----
 .../camel/processor/TrySetFaultFinallyTest.java | 52 +++++++++++
 2 files changed, 119 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3660437a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index b04e172..4fe21a6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -41,37 +41,30 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // clear exception so finally block can be executed
-        final Exception e = exchange.getException();
+        // clear exception and fault so finally block can be executed
+        final boolean fault;
+        if (exchange.hasOut()) {
+            fault = exchange.getOut().isFault();
+            exchange.getOut().setFault(false);
+        } else {
+            fault = exchange.getIn().isFault();
+            exchange.getIn().setFault(false);
+        }
+
+        final Exception exception = exchange.getException();
         exchange.setException(null);
         // but store the caught exception as a property
-        if (e != null) {
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+        if (exception != null) {
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
         }
+
         // store the last to endpoint as the failure endpoint
         if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
             exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
         }
 
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (e == null) {
-                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
-                } else {
-                    // set exception back on exchange
-                    exchange.setException(e);
-                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-                }
-
-                if (!doneSync) {
-                    // signal callback to continue routing async
-                    ExchangeHelper.prepareOutToIn(exchange);
-                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                }
-                callback.done(doneSync);
-            }
-        });
-        return sync;
+        // continue processing
+        return processor.process(exchange, new FinallyAsyncCallback(exchange, callback, exception, fault));
     }
 
     @Override
@@ -90,4 +83,55 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
     public void setId(String id) {
         this.id = id;
     }
+
+    private static final class FinallyAsyncCallback implements AsyncCallback {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final Exception exception;
+        private final boolean fault;
+
+        public FinallyAsyncCallback(Exchange exchange, AsyncCallback callback, Exception exception, boolean fault) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.exception = exception;
+            this.fault = fault;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            try {
+                if (exception == null) {
+                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
+                } else {
+                    // set exception back on exchange
+                    exchange.setException(exception);
+                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+                }
+                // set fault flag back
+                if (fault) {
+                    if (exchange.hasOut()) {
+                        exchange.getOut().setFault(true);
+                    } else {
+                        exchange.getIn().setFault(true);
+                    }
+                }
+
+                if (!doneSync) {
+                    // signal callback to continue routing async
+                    ExchangeHelper.prepareOutToIn(exchange);
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
+            } finally {
+                // callback must always be called
+                callback.done(doneSync);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "FinallyAsyncCallback";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3660437a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
new file mode 100644
index 0000000..4413f9b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class TrySetFaultFinallyTest extends ContextTestSupport {
+
+    public void testSetFaultFinally() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.requestBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .doTry()
+                        .to("mock:a")
+                        .setFaultBody(constant("Failed at A"))
+                    .doFinally()
+                        .to("mock:b")
+                        .to("mock:c")
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+}