You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/19 19:28:10 UTC
[1/2] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is
using hystrix.
Repository: camel
Updated Branches:
refs/heads/master 62bbf9b23 -> 41df4acb2
CAMEL-9879: Circuit Breaker EIP - That is using hystrix.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8732df93
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8732df93
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8732df93
Branch: refs/heads/master
Commit: 8732df935968b77720002ee347f84536be4bd481
Parents: 62bbf9b
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 18:56:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 18:56:35 2016 +0200
----------------------------------------------------------------------
.../hystrix/processor/HystrixProcessor.java | 27 +++++++++++++++++---
.../processor/HystrixProcessorCommand.java | 26 +++++++------------
2 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8732df93/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index 1b497bf..f59c1cf 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -24,6 +24,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.Message;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.spi.IdAware;
@@ -88,10 +89,30 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
}
@Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ // run this as if we run inside try .. catch so there is no regular Camel error handler
+ exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+
try {
- command.queue();
+ // create command
+ HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+
+ // execute the command asynchronous and observe when its done
+ command.observe().subscribe((msg) -> {
+ if (command.isResponseFromCache()) {
+ // its from cache so need to copy it into the exchange
+ Message target = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ target.copyFrom(msg);
+ } else {
+ // if it was not from cache then run/fallback was executed and the result
+ // is already set correctly on the exchange and we do not need to do anything
+ }
+ }, throwable -> {
+ exchange.setException(throwable);
+ }, () -> {
+ exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
+ callback.done(false);
+ });
} catch (Throwable e) {
// error adding to queue, so set as error and we are done
exchange.setException(e);
http://git-wip-us.apache.org/repos/asf/camel/blob/8732df93/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 0ba6158..26243b57 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -21,13 +21,14 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hystrix Command for the Camel Hystrix EIP.
*/
-public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
+public class HystrixProcessorCommand extends HystrixCommand<Message> {
private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class);
private final Exchange exchange;
@@ -60,11 +61,11 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
}
@Override
- protected Exchange getFallback() {
+ protected Message getFallback() {
// only run fallback if there was an exception
Exception exception = exchange.getException();
if (exception == null) {
- return exchange;
+ return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
}
try {
@@ -91,18 +92,15 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
} finally {
LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
- callback.done(false);
}
- return exchange;
+ return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
}
@Override
- protected Exchange run() throws Exception {
+ protected Message run() throws Exception {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
- // run this as if we run inside try .. catch so there is no regular Camel error handler
- exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
try {
processor.process(exchange, callback);
} catch (Exception e) {
@@ -116,15 +114,9 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
throw exchange.getException();
}
- // no fallback then we are done
- try {
- LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
- exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
- callback.done(false);
- } catch (Throwable e) {
- exchange.setException(e);
- }
- return exchange;
+ LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+ // no fallback then we are done
+ return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
}
}
[2/2] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is
using hystrix.
Posted by da...@apache.org.
CAMEL-9879: Circuit Breaker EIP - That is using hystrix.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/41df4acb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/41df4acb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/41df4acb
Branch: refs/heads/master
Commit: 41df4acb2e6e28d81aa9977e08f4f5f922a5b117
Parents: 8732df9
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 19:25:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 19:27:07 2016 +0200
----------------------------------------------------------------------
.../model/HystrixConfigurationDefinition.java | 20 ----------------
.../apache/camel/model/HystrixDefinition.java | 24 --------------------
.../hystrix/processor/HystrixProcessor.java | 7 ++----
.../processor/HystrixProcessorCommand.java | 18 +--------------
.../processor/HystrixProcessorFactory.java | 11 +--------
.../processor/HystrixRouteConfigTest.java | 2 +-
.../SpringHystrixRouteConfigRefTest.xml | 2 +-
.../processor/SpringHystrixRouteConfigTest.xml | 2 +-
8 files changed, 7 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
index d4e9f52..3993f47 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
@@ -103,9 +103,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
private Integer metricsRollingStatisticalWindowBuckets;
@XmlAttribute
@Metadata(label = "command", defaultValue = "true")
- private Boolean requestCacheEnabled;
- @XmlAttribute
- @Metadata(label = "command", defaultValue = "true")
private Boolean requestLogEnabled;
// thread-pool
@@ -315,14 +312,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
}
- public Boolean getRequestCacheEnabled() {
- return requestCacheEnabled;
- }
-
- public void setRequestCacheEnabled(Boolean requestCacheEnabled) {
- this.requestCacheEnabled = requestCacheEnabled;
- }
-
public Boolean getRequestLogEnabled() {
return requestLogEnabled;
}
@@ -594,15 +583,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
}
/**
- * Whether HystrixCommand.getCacheKey() should be used with HystrixRequestCache
- * to provide de-duplication functionality via request-scoped caching.
- */
- public HystrixConfigurationDefinition requestCacheEnabled(Boolean requestCacheEnabled) {
- this.requestCacheEnabled = requestCacheEnabled;
- return this;
- }
-
- /**
* Whether HystrixCommand execution and events should be logged to HystrixRequestLog.
*/
public HystrixConfigurationDefinition requestLogEnabled(Boolean requestLogEnabled) {
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
index 3ca72f5..6741e91 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
@@ -26,7 +26,6 @@ import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
@@ -38,8 +37,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
@XmlElement
private HystrixConfigurationDefinition hystrixConfiguration;
- @XmlElement
- private ExpressionSubElementDefinition cacheKey;
@XmlElementRef
private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
@XmlElement
@@ -136,14 +133,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
this.hystrixConfigurationRef = hystrixConfigurationRef;
}
- public ExpressionSubElementDefinition getCacheKey() {
- return cacheKey;
- }
-
- public void setCacheKey(ExpressionSubElementDefinition cacheKey) {
- this.cacheKey = cacheKey;
- }
-
public FallbackDefinition getFallback() {
return fallback;
}
@@ -198,19 +187,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
}
/**
- * Sets the expression to use for generating the cache key.
- * <p/>
- * Key to be used for request caching.
- * By default this returns null which means "do not cache".
- * To enable caching set an expression that returns a string key uniquely representing the state of a command instance.
- * If multiple command instances in the same request scope match keys then only the first will be executed and all others returned from cache.
- */
- public HystrixDefinition cacheKey(Expression expression) {
- setCacheKey(new ExpressionSubElementDefinition(expression));
- return this;
- }
-
- /**
* Sets the fallback node
*/
public HystrixDefinition fallback() {
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index f59c1cf..3a5e896 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -23,7 +23,6 @@ import com.netflix.hystrix.HystrixCommand;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
@@ -41,13 +40,11 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
private final HystrixCommand.Setter setter;
private final AsyncProcessor processor;
private final AsyncProcessor fallback;
- private final Expression cacheKey;
- public HystrixProcessor(HystrixCommand.Setter setter, Processor processor, Processor fallback, Expression cacheKey) {
+ public HystrixProcessor(HystrixCommand.Setter setter, Processor processor, Processor fallback) {
this.setter = setter;
this.processor = AsyncProcessorConverterHelper.convert(processor);
this.fallback = AsyncProcessorConverterHelper.convert(fallback);
- this.cacheKey = cacheKey;
}
@Override
@@ -95,7 +92,7 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
try {
// create command
- HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+ HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback);
// execute the command asynchronous and observe when its done
command.observe().subscribe((msg) -> {
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 26243b57..0ebc34f 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -35,29 +35,13 @@ public class HystrixProcessorCommand extends HystrixCommand<Message> {
private final AsyncCallback callback;
private final AsyncProcessor processor;
private final AsyncProcessor fallback;
- private final Expression cacheKey;
- public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback, Expression cacheKey) {
+ public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) {
super(setter);
this.exchange = exchange;
this.callback = callback;
this.processor = processor;
this.fallback = fallback;
- this.cacheKey = cacheKey;
- }
-
- @Override
- protected String getCacheKey() {
- // TODO: require https://github.com/Netflix/Hystrix/wiki/How-To-Use#Caching
- if (cacheKey != null) {
- try {
- return cacheKey.evaluate(exchange, String.class);
- } catch (Throwable e) {
- // ignore
- LOG.debug("Error evaluating cache key. This exception is ignored.", e);
- }
- }
- return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
index 26ea912..fb9aca6 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
@@ -96,13 +96,7 @@ public class HystrixProcessorFactory implements ProcessorFactory {
configureHystrix(command, threadPool, config);
}
- // optional cache-key from expression
- Expression cacheKey = null;
- if (cb.getCacheKey() != null) {
- cacheKey = cb.getCacheKey().createExpression(routeContext);
- }
-
- return new HystrixProcessor(setter, processor, fallback, cacheKey);
+ return new HystrixProcessor(setter, processor, fallback);
} else {
return null;
}
@@ -170,9 +164,6 @@ public class HystrixProcessorFactory implements ProcessorFactory {
if (config.getMetricsRollingStatisticalWindowBuckets() != null) {
command.withMetricsRollingStatisticalWindowBuckets(config.getMetricsRollingStatisticalWindowBuckets());
}
- if (config.getRequestCacheEnabled() != null) {
- command.withRequestCacheEnabled(config.getRequestCacheEnabled());
- }
if (config.getRequestLogEnabled() != null) {
command.withRequestLogEnabled(config.getRequestLogEnabled());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
index ebfa660..d8dc5f4 100644
--- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
@@ -37,7 +37,7 @@ public class HystrixRouteConfigTest extends CamelTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .hystrix().configure().groupKey("myCamelApp").requestCacheEnabled(false).corePoolSize(5).end()
+ .hystrix().configure().groupKey("myCamelApp").requestLogEnabled(false).corePoolSize(5).end()
.to("direct:foo")
.fallback()
.transform().constant("Fallback message")
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
index 887e81e..1035cde 100644
--- a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
+++ b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
@@ -23,7 +23,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
- <hystrixConfiguration id="hysConfig" requestCacheEnabled="false" corePoolSize="5"/>
+ <hystrixConfiguration id="hysConfig" requestLogEnabled="false" corePoolSize="5"/>
<route>
<from uri="direct:start"/>
http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
index 40a8ccb..6d62e40 100644
--- a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
+++ b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
@@ -25,7 +25,7 @@
<route>
<from uri="direct:start"/>
<hystrix>
- <hystrixConfiguration groupKey="myCamelApp" requestCacheEnabled="false" corePoolSize="5"/>
+ <hystrixConfiguration groupKey="myCamelApp" requestLogEnabled="false" corePoolSize="5"/>
<to uri="direct:foo"/>
<fallback>
<transform>