You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/07 09:20:57 UTC
[1/2] camel git commit: CAMEL-9673: doTry .. doFinally should run the
finally block for fault messages also
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x 66b436fb7 -> 3660437ae
refs/heads/master 59c8a9e6a -> 7944093fd
CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7944093f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7944093f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7944093f
Branch: refs/heads/master
Commit: 7944093fdcbfca12cc14afba93a674e81a882fc8
Parents: 59c8a9e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 7 09:19:50 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 7 09:19:56 2016 +0100
----------------------------------------------------------------------
.../camel/processor/FinallyProcessor.java | 90 +++++++++++++++-----
.../camel/processor/TrySetFaultFinallyTest.java | 52 +++++++++++
2 files changed, 119 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7944093f/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index b04e172..4fe21a6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -41,37 +41,30 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- // clear exception so finally block can be executed
- final Exception e = exchange.getException();
+ // clear exception and fault so finally block can be executed
+ final boolean fault;
+ if (exchange.hasOut()) {
+ fault = exchange.getOut().isFault();
+ exchange.getOut().setFault(false);
+ } else {
+ fault = exchange.getIn().isFault();
+ exchange.getIn().setFault(false);
+ }
+
+ final Exception exception = exchange.getException();
exchange.setException(null);
// but store the caught exception as a property
- if (e != null) {
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+ if (exception != null) {
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
}
+
// store the last to endpoint as the failure endpoint
if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
}
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (e == null) {
- exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
- } else {
- // set exception back on exchange
- exchange.setException(e);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
- }
-
- if (!doneSync) {
- // signal callback to continue routing async
- ExchangeHelper.prepareOutToIn(exchange);
- LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- }
- callback.done(doneSync);
- }
- });
- return sync;
+ // continue processing
+ return processor.process(exchange, new FinallyAsyncCallback(exchange, callback, exception, fault));
}
@Override
@@ -90,4 +83,55 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
public void setId(String id) {
this.id = id;
}
+
+ private static final class FinallyAsyncCallback implements AsyncCallback {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final Exception exception;
+ private final boolean fault;
+
+ public FinallyAsyncCallback(Exchange exchange, AsyncCallback callback, Exception exception, boolean fault) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.exception = exception;
+ this.fault = fault;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ if (exception == null) {
+ exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
+ } else {
+ // set exception back on exchange
+ exchange.setException(exception);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+ }
+ // set fault flag back
+ if (fault) {
+ if (exchange.hasOut()) {
+ exchange.getOut().setFault(true);
+ } else {
+ exchange.getIn().setFault(true);
+ }
+ }
+
+ if (!doneSync) {
+ // signal callback to continue routing async
+ ExchangeHelper.prepareOutToIn(exchange);
+ LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ }
+ } finally {
+ // callback must always be called
+ callback.done(doneSync);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FinallyAsyncCallback";
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7944093f/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
new file mode 100644
index 0000000..4413f9b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class TrySetFaultFinallyTest extends ContextTestSupport {
+
+ public void testSetFaultFinally() throws Exception {
+ getMockEndpoint("mock:a").expectedMessageCount(1);
+ getMockEndpoint("mock:b").expectedMessageCount(1);
+ getMockEndpoint("mock:c").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.requestBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .doTry()
+ .to("mock:a")
+ .setFaultBody(constant("Failed at A"))
+ .doFinally()
+ .to("mock:b")
+ .to("mock:c")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}
[2/2] camel git commit: CAMEL-9673: doTry .. doFinally should run the
finally block for fault messages also
Posted by da...@apache.org.
CAMEL-9673: doTry .. doFinally should run the finally block for fault messages also
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3660437a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3660437a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3660437a
Branch: refs/heads/camel-2.16.x
Commit: 3660437ae50128665fa2dea0b76d93dff242c0f8
Parents: 66b436f
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 7 09:19:50 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 7 09:20:16 2016 +0100
----------------------------------------------------------------------
.../camel/processor/FinallyProcessor.java | 90 +++++++++++++++-----
.../camel/processor/TrySetFaultFinallyTest.java | 52 +++++++++++
2 files changed, 119 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3660437a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index b04e172..4fe21a6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -41,37 +41,30 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- // clear exception so finally block can be executed
- final Exception e = exchange.getException();
+ // clear exception and fault so finally block can be executed
+ final boolean fault;
+ if (exchange.hasOut()) {
+ fault = exchange.getOut().isFault();
+ exchange.getOut().setFault(false);
+ } else {
+ fault = exchange.getIn().isFault();
+ exchange.getIn().setFault(false);
+ }
+
+ final Exception exception = exchange.getException();
exchange.setException(null);
// but store the caught exception as a property
- if (e != null) {
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+ if (exception != null) {
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
}
+
// store the last to endpoint as the failure endpoint
if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
}
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (e == null) {
- exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
- } else {
- // set exception back on exchange
- exchange.setException(e);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
- }
-
- if (!doneSync) {
- // signal callback to continue routing async
- ExchangeHelper.prepareOutToIn(exchange);
- LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- }
- callback.done(doneSync);
- }
- });
- return sync;
+ // continue processing
+ return processor.process(exchange, new FinallyAsyncCallback(exchange, callback, exception, fault));
}
@Override
@@ -90,4 +83,55 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
public void setId(String id) {
this.id = id;
}
+
+ private static final class FinallyAsyncCallback implements AsyncCallback {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final Exception exception;
+ private final boolean fault;
+
+ public FinallyAsyncCallback(Exchange exchange, AsyncCallback callback, Exception exception, boolean fault) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.exception = exception;
+ this.fault = fault;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ if (exception == null) {
+ exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
+ } else {
+ // set exception back on exchange
+ exchange.setException(exception);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+ }
+ // set fault flag back
+ if (fault) {
+ if (exchange.hasOut()) {
+ exchange.getOut().setFault(true);
+ } else {
+ exchange.getIn().setFault(true);
+ }
+ }
+
+ if (!doneSync) {
+ // signal callback to continue routing async
+ ExchangeHelper.prepareOutToIn(exchange);
+ LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ }
+ } finally {
+ // callback must always be called
+ callback.done(doneSync);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FinallyAsyncCallback";
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3660437a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
new file mode 100644
index 0000000..4413f9b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/TrySetFaultFinallyTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class TrySetFaultFinallyTest extends ContextTestSupport {
+
+ public void testSetFaultFinally() throws Exception {
+ getMockEndpoint("mock:a").expectedMessageCount(1);
+ getMockEndpoint("mock:b").expectedMessageCount(1);
+ getMockEndpoint("mock:c").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.requestBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .doTry()
+ .to("mock:a")
+ .setFaultBody(constant("Failed at A"))
+ .doFinally()
+ .to("mock:b")
+ .to("mock:c")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}