You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/19 08:50:19 UTC
[2/3] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is
using hystrix
CAMEL-9879: Circuit Breaker EIP - That is using hystrix
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39663b27
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39663b27
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39663b27
Branch: refs/heads/hys
Commit: 39663b279eb48d64155bea3654f5ca53f027171b
Parents: 22d8371
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 08:42:07 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 08:42:07 2016 +0200
----------------------------------------------------------------------
.../component/hystrix/HystrixProcessor.java | 37 +++++++++++---
.../hystrix/HystrixProcessorCommand.java | 50 ++++++++++++------
.../hystrix/HystrixRouteFallbackTest.java | 2 +-
.../camel/component/hystrix/TryCatchTest.java | 53 ++++++++++++++++++++
.../src/test/resources/log4j.properties | 2 +-
5 files changed, 121 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/39663b27/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessor.java
index 81b09e4..495706f 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessor.java
@@ -88,18 +88,22 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
// run this as if we run inside try .. catch so there is no regular Camel error handler
exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+ // use our callback that does some cleanup when done
+ AsyncCallback hystrixCallback = new HystrixAsyncCallback(exchange, callback);
+
HystrixCommandGroupKey key = HystrixCommandGroupKey.Factory.asKey(id);
- HystrixProcessorCommand command = new HystrixProcessorCommand(key, exchange, callback, processor, fallback);
+ HystrixProcessorCommand command = new HystrixProcessorCommand(key, exchange, hystrixCallback, processor, fallback);
try {
- command.execute();
+ command.queue();
} catch (Throwable e) {
+ // error adding to queue, so set as error and we are done
exchange.setException(e);
+ exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
+ callback.done(true);
+ return true;
}
- exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
-
- callback.done(true);
- return true;
+ return false;
}
@Override
@@ -111,4 +115,25 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
protected void doStop() throws Exception {
// noop
}
+
+ private static final class HystrixAsyncCallback implements AsyncCallback {
+
+ private final Exchange exchange;
+ private final AsyncCallback delegate;
+
+ public HystrixAsyncCallback(Exchange exchange, AsyncCallback delegate) {
+ this.exchange = exchange;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ if (doneSync) {
+ return;
+ }
+ // we are only done when called with false
+ exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
+ delegate.done(doneSync);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/39663b27/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessorCommand.java
index 66a8150..299819e 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProcessorCommand.java
@@ -21,12 +21,15 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Hystrix Command for the Camel Hystrix EIP.
*/
public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
+ private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class);
private final Exchange exchange;
private final AsyncCallback callback;
private final AsyncProcessor processor;
@@ -43,46 +46,63 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
@Override
protected Exchange getFallback() {
- if (fallback != null) {
- try {
- Exception e = exchange.getException();
+ // only run fallback if there was an exception
+ Exception exception = exchange.getException();
+ if (exception == null) {
+ return exchange;
+ }
+
+ try {
+ if (fallback != null) {
// store the last to endpoint as the failure endpoint
if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
}
// give the rest of the pipeline another chance
exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
exchange.setException(null);
// and we should not be regarded as exhausted as we are in a try .. catch block
exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-
- fallback.process(exchange, callback);
- } catch (Exception e) {
- exchange.setException(e);
- } finally {
- callback.done(true);
+ // run the fallback processor
+ try {
+ LOG.debug("Running fallback: {}", exchange);
+ fallback.process(exchange, callback);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
}
- return exchange;
- } else {
- return null;
+ } finally {
+ LOG.debug("Running fallback: {} success", exchange);
+ callback.done(false);
}
+
+ return exchange;
}
@Override
protected Exchange run() throws Exception {
+ LOG.debug("Running processor: {}", exchange);
+
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
+ exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
try {
processor.process(exchange, callback);
} catch (Exception e) {
exchange.setException(e);
- } finally {
- callback.done(true);
}
// if we failed then throw an exception
if (exchange.getException() != null) {
throw exchange.getException();
}
+ // no errors we are done
+ try {
+ LOG.debug("Running processor: {} success", exchange);
+ callback.done(false);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
return exchange;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/39663b27/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixRouteFallbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixRouteFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixRouteFallbackTest.java
index 43aa817..dc7c766 100644
--- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixRouteFallbackTest.java
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixRouteFallbackTest.java
@@ -44,7 +44,7 @@ public class HystrixRouteFallbackTest extends CamelTestSupport {
.end()
.to("mock:result");
- from("direct:foo")
+ from("direct:foo").errorHandler(noErrorHandler())
.throwException(new IllegalArgumentException("Forced"));
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/39663b27/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/TryCatchTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/TryCatchTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/TryCatchTest.java
new file mode 100644
index 0000000..089b3b8
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/TryCatchTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class TryCatchTest extends CamelTestSupport {
+
+ @Test
+ public void testTryCatch() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .doTry()
+ .to("direct:foo")
+ .doCatch(Exception.class)
+ .transform().constant("Fallback message")
+ .end()
+ .to("mock:result");
+
+ from("direct:foo").errorHandler(noErrorHandler())
+ .throwException(new IllegalArgumentException("Forced"));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/39663b27/components/camel-hystrix/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/resources/log4j.properties b/components/camel-hystrix/src/test/resources/log4j.properties
index be664b3..2d541ed 100644
--- a/components/camel-hystrix/src/test/resources/log4j.properties
+++ b/components/camel-hystrix/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
#
# The logging properties used for testing.
#
-log4j.rootLogger=DEBUG, file
+log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
#log4j.logger.org.apache.camel.component.hystrix=DEBUG