You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/19 19:28:10 UTC

[1/2] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is using hystrix.

Repository: camel
Updated Branches:
  refs/heads/master 62bbf9b23 -> 41df4acb2


CAMEL-9879: Circuit Breaker EIP - That is using hystrix.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8732df93
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8732df93
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8732df93

Branch: refs/heads/master
Commit: 8732df935968b77720002ee347f84536be4bd481
Parents: 62bbf9b
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 18:56:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 18:56:35 2016 +0200

----------------------------------------------------------------------
 .../hystrix/processor/HystrixProcessor.java     | 27 +++++++++++++++++---
 .../processor/HystrixProcessorCommand.java      | 26 +++++++------------
 2 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8732df93/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index 1b497bf..f59c1cf 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -24,6 +24,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.Message;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.IdAware;
@@ -88,10 +89,30 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        // run this as if we run inside try .. catch so there is no regular Camel error handler
+        exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+
         try {
-            command.queue();
+            // create command
+            HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+
+            // execute the command asynchronous and observe when its done
+            command.observe().subscribe((msg) -> {
+                    if (command.isResponseFromCache()) {
+                        // its from cache so need to copy it into the exchange
+                        Message target = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+                        target.copyFrom(msg);
+                    } else {
+                        // if it was not from cache then run/fallback was executed and the result
+                        // is already set correctly on the exchange and we do not need to do anything
+                    }
+                }, throwable -> {
+                    exchange.setException(throwable);
+                }, () -> {
+                    exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
+                    callback.done(false);
+                });
         } catch (Throwable e) {
             // error adding to queue, so set as error and we are done
             exchange.setException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/8732df93/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 0ba6158..26243b57 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -21,13 +21,14 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Hystrix Command for the Camel Hystrix EIP.
  */
-public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
+public class HystrixProcessorCommand extends HystrixCommand<Message> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class);
     private final Exchange exchange;
@@ -60,11 +61,11 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
     }
 
     @Override
-    protected Exchange getFallback() {
+    protected Message getFallback() {
         // only run fallback if there was an exception
         Exception exception = exchange.getException();
         if (exception == null) {
-            return exchange;
+            return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
         }
 
         try {
@@ -91,18 +92,15 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
         } finally {
             LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
             exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
-            callback.done(false);
         }
 
-        return exchange;
+        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
     }
 
     @Override
-    protected Exchange run() throws Exception {
+    protected Message run() throws Exception {
         LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
 
-        // run this as if we run inside try .. catch so there is no regular Camel error handler
-        exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
         try {
             processor.process(exchange, callback);
         } catch (Exception e) {
@@ -116,15 +114,9 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange> {
         if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
             throw exchange.getException();
         }
-        // no fallback then we are done
-        try {
-            LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
-            exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
-            callback.done(false);
-        } catch (Throwable e) {
-            exchange.setException(e);
-        }
 
-        return exchange;
+        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+        // no fallback then we are done
+        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
     }
 }


[2/2] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is using hystrix.

Posted by da...@apache.org.
CAMEL-9879: Circuit Breaker EIP - That is using hystrix.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/41df4acb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/41df4acb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/41df4acb

Branch: refs/heads/master
Commit: 41df4acb2e6e28d81aa9977e08f4f5f922a5b117
Parents: 8732df9
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 19 19:25:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 19 19:27:07 2016 +0200

----------------------------------------------------------------------
 .../model/HystrixConfigurationDefinition.java   | 20 ----------------
 .../apache/camel/model/HystrixDefinition.java   | 24 --------------------
 .../hystrix/processor/HystrixProcessor.java     |  7 ++----
 .../processor/HystrixProcessorCommand.java      | 18 +--------------
 .../processor/HystrixProcessorFactory.java      | 11 +--------
 .../processor/HystrixRouteConfigTest.java       |  2 +-
 .../SpringHystrixRouteConfigRefTest.xml         |  2 +-
 .../processor/SpringHystrixRouteConfigTest.xml  |  2 +-
 8 files changed, 7 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
index d4e9f52..3993f47 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
@@ -103,9 +103,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
     private Integer metricsRollingStatisticalWindowBuckets;
     @XmlAttribute
     @Metadata(label = "command", defaultValue = "true")
-    private Boolean requestCacheEnabled;
-    @XmlAttribute
-    @Metadata(label = "command", defaultValue = "true")
     private Boolean requestLogEnabled;
 
     // thread-pool
@@ -315,14 +312,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
         this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
     }
 
-    public Boolean getRequestCacheEnabled() {
-        return requestCacheEnabled;
-    }
-
-    public void setRequestCacheEnabled(Boolean requestCacheEnabled) {
-        this.requestCacheEnabled = requestCacheEnabled;
-    }
-
     public Boolean getRequestLogEnabled() {
         return requestLogEnabled;
     }
@@ -594,15 +583,6 @@ public class HystrixConfigurationDefinition extends IdentifiedType {
     }
 
     /**
-     * Whether HystrixCommand.getCacheKey() should be used with HystrixRequestCache
-     * to provide de-duplication functionality via request-scoped caching.
-     */
-    public HystrixConfigurationDefinition requestCacheEnabled(Boolean requestCacheEnabled) {
-        this.requestCacheEnabled = requestCacheEnabled;
-        return this;
-    }
-
-    /**
      * Whether HystrixCommand execution and events should be logged to HystrixRequestLog.
      */
     public HystrixConfigurationDefinition requestLogEnabled(Boolean requestLogEnabled) {

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
index 3ca72f5..6741e91 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
@@ -26,7 +26,6 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 
-import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -38,8 +37,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
 
     @XmlElement
     private HystrixConfigurationDefinition hystrixConfiguration;
-    @XmlElement
-    private ExpressionSubElementDefinition cacheKey;
     @XmlElementRef
     private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
     @XmlElement
@@ -136,14 +133,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
         this.hystrixConfigurationRef = hystrixConfigurationRef;
     }
 
-    public ExpressionSubElementDefinition getCacheKey() {
-        return cacheKey;
-    }
-
-    public void setCacheKey(ExpressionSubElementDefinition cacheKey) {
-        this.cacheKey = cacheKey;
-    }
-
     public FallbackDefinition getFallback() {
         return fallback;
     }
@@ -198,19 +187,6 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> {
     }
 
     /**
-     * Sets the expression to use for generating the cache key.
-     * <p/>
-     * Key to be used for request caching.
-     * By default this returns null which means "do not cache".
-     * To enable caching set an expression that returns a string key uniquely representing the state of a command instance.
-     * If multiple command instances in the same request scope match keys then only the first will be executed and all others returned from cache.
-     */
-    public HystrixDefinition cacheKey(Expression expression) {
-        setCacheKey(new ExpressionSubElementDefinition(expression));
-        return this;
-    }
-
-    /**
      * Sets the fallback node
      */
     public HystrixDefinition fallback() {

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index f59c1cf..3a5e896 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -23,7 +23,6 @@ import com.netflix.hystrix.HystrixCommand;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -41,13 +40,11 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
     private final HystrixCommand.Setter setter;
     private final AsyncProcessor processor;
     private final AsyncProcessor fallback;
-    private final Expression cacheKey;
 
-    public HystrixProcessor(HystrixCommand.Setter setter, Processor processor, Processor fallback, Expression cacheKey) {
+    public HystrixProcessor(HystrixCommand.Setter setter, Processor processor, Processor fallback) {
         this.setter = setter;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.fallback = AsyncProcessorConverterHelper.convert(fallback);
-        this.cacheKey = cacheKey;
     }
 
     @Override
@@ -95,7 +92,7 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
 
         try {
             // create command
-            HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback, cacheKey);
+            HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback);
 
             // execute the command asynchronous and observe when its done
             command.observe().subscribe((msg) -> {

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 26243b57..0ebc34f 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -35,29 +35,13 @@ public class HystrixProcessorCommand extends HystrixCommand<Message> {
     private final AsyncCallback callback;
     private final AsyncProcessor processor;
     private final AsyncProcessor fallback;
-    private final Expression cacheKey;
 
-    public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback, Expression cacheKey) {
+    public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) {
         super(setter);
         this.exchange = exchange;
         this.callback = callback;
         this.processor = processor;
         this.fallback = fallback;
-        this.cacheKey = cacheKey;
-    }
-
-    @Override
-    protected String getCacheKey() {
-        // TODO: require https://github.com/Netflix/Hystrix/wiki/How-To-Use#Caching
-        if (cacheKey != null) {
-            try {
-                return cacheKey.evaluate(exchange, String.class);
-            } catch (Throwable e) {
-                // ignore
-                LOG.debug("Error evaluating cache key. This exception is ignored.", e);
-            }
-        }
-        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
index 26ea912..fb9aca6 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
@@ -96,13 +96,7 @@ public class HystrixProcessorFactory implements ProcessorFactory {
                 configureHystrix(command, threadPool, config);
             }
 
-            // optional cache-key from expression
-            Expression cacheKey = null;
-            if (cb.getCacheKey() != null) {
-                cacheKey = cb.getCacheKey().createExpression(routeContext);
-            }
-
-            return new HystrixProcessor(setter, processor, fallback, cacheKey);
+            return new HystrixProcessor(setter, processor, fallback);
         } else {
             return null;
         }
@@ -170,9 +164,6 @@ public class HystrixProcessorFactory implements ProcessorFactory {
         if (config.getMetricsRollingStatisticalWindowBuckets() != null) {
             command.withMetricsRollingStatisticalWindowBuckets(config.getMetricsRollingStatisticalWindowBuckets());
         }
-        if (config.getRequestCacheEnabled() != null) {
-            command.withRequestCacheEnabled(config.getRequestCacheEnabled());
-        }
         if (config.getRequestLogEnabled() != null) {
             command.withRequestLogEnabled(config.getRequestLogEnabled());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
index ebfa660..d8dc5f4 100644
--- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteConfigTest.java
@@ -37,7 +37,7 @@ public class HystrixRouteConfigTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .hystrix().configure().groupKey("myCamelApp").requestCacheEnabled(false).corePoolSize(5).end()
+                    .hystrix().configure().groupKey("myCamelApp").requestLogEnabled(false).corePoolSize(5).end()
                         .to("direct:foo")
                     .fallback()
                         .transform().constant("Fallback message")

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
index 887e81e..1035cde 100644
--- a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
+++ b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigRefTest.xml
@@ -23,7 +23,7 @@
 
   <camelContext xmlns="http://camel.apache.org/schema/spring">
 
-    <hystrixConfiguration id="hysConfig" requestCacheEnabled="false" corePoolSize="5"/>
+    <hystrixConfiguration id="hysConfig" requestLogEnabled="false" corePoolSize="5"/>
 
     <route>
       <from uri="direct:start"/>

http://git-wip-us.apache.org/repos/asf/camel/blob/41df4acb/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
index 40a8ccb..6d62e40 100644
--- a/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
+++ b/components/camel-hystrix/src/test/resources/org/apache/camel/component/hystrix/processor/SpringHystrixRouteConfigTest.xml
@@ -25,7 +25,7 @@
     <route>
       <from uri="direct:start"/>
       <hystrix>
-        <hystrixConfiguration groupKey="myCamelApp" requestCacheEnabled="false" corePoolSize="5"/>
+        <hystrixConfiguration groupKey="myCamelApp" requestLogEnabled="false" corePoolSize="5"/>
         <to uri="direct:foo"/>
         <fallback>
           <transform>