You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/29 16:55:30 UTC

[1/4] camel git commit: CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause

Repository: camel
Updated Branches:
  refs/heads/camel-2.18.x 36e481749 -> abc319031
  refs/heads/master 28dcd4340 -> 906a612d3


CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2b5ba0bf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2b5ba0bf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2b5ba0bf

Branch: refs/heads/camel-2.18.x
Commit: 2b5ba0bfb33b3f1475cf79cb2be569725ef15d3c
Parents: 36e4817
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 14:16:59 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 14:17:24 2016 +0100

----------------------------------------------------------------------
 .../hystrix/processor/HystrixProcessorCommand.java  | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2b5ba0bf/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4face6e..4d86ef7 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hystrix.processor;
 
 import com.netflix.hystrix.HystrixCommand;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -45,10 +46,10 @@ public class HystrixProcessorCommand extends HystrixCommand {
 
     @Override
     protected Message getFallback() {
-        // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
-        Throwable exception = getExecutionException();
-
         if (fallback != null || fallbackCommand != null) {
+            // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+            Throwable exception = getExecutionException();
+
             if (exception != null) {
                 LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
             } else {
@@ -100,13 +101,20 @@ public class HystrixProcessorCommand extends HystrixCommand {
         // is fallback enabled
         Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
 
+        // execution exception must take precedence over exchange exception
+        // because hystrix may have caused this command to fail due timeout or something else
+        Throwable exception = getExecutionException();
+        if (exception != null) {
+            exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+        }
+
         // if we failed then throw an exception if fallback is enabled
         if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
             throw exchange.getException();
         }
 
-        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
         // no fallback then we are done
+        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
     }
 }


[3/4] camel git commit: CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause

Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5807f211
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5807f211
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5807f211

Branch: refs/heads/master
Commit: 5807f21123093abf98472321340f59cfca293b54
Parents: 28dcd43
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 14:16:59 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:07 2016 +0100

----------------------------------------------------------------------
 .../hystrix/processor/HystrixProcessorCommand.java  | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5807f211/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4face6e..4d86ef7 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hystrix.processor;
 
 import com.netflix.hystrix.HystrixCommand;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -45,10 +46,10 @@ public class HystrixProcessorCommand extends HystrixCommand {
 
     @Override
     protected Message getFallback() {
-        // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
-        Throwable exception = getExecutionException();
-
         if (fallback != null || fallbackCommand != null) {
+            // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+            Throwable exception = getExecutionException();
+
             if (exception != null) {
                 LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
             } else {
@@ -100,13 +101,20 @@ public class HystrixProcessorCommand extends HystrixCommand {
         // is fallback enabled
         Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
 
+        // execution exception must take precedence over exchange exception
+        // because hystrix may have caused this command to fail due timeout or something else
+        Throwable exception = getExecutionException();
+        if (exception != null) {
+            exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+        }
+
         // if we failed then throw an exception if fallback is enabled
         if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
             throw exchange.getException();
         }
 
-        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
         // no fallback then we are done
+        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
     }
 }


[2/4] camel git commit: CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.

Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/906a612d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/906a612d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/906a612d

Branch: refs/heads/master
Commit: 906a612d3b59c4a36ad53d084b8cef3ba608cdc4
Parents: 5807f21
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 16:10:46 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:07 2016 +0100

----------------------------------------------------------------------
 .../processor/HystrixProcessorCommand.java      | 144 ++++++++++++-------
 .../hystrix/processor/HystrixTimeoutTest.java   |  97 +++++++++++++
 .../HystrixTimeoutWithFallbackTest.java         |  80 +++++++++++
 3 files changed, 272 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4d86ef7..511a46e 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -16,11 +16,14 @@
  */
 package org.apache.camel.component.hystrix.processor;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import com.netflix.hystrix.HystrixCommand;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
     private final Processor processor;
     private final Processor fallback;
     private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
+    private final AtomicBoolean fallbackInUse = new AtomicBoolean();
+    private final Object lock = new Object();
 
     public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback,
                                    HystrixProcessorCommandFallbackViaNetwork fallbackCommand) {
@@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand {
 
     @Override
     protected Message getFallback() {
-        if (fallback != null || fallbackCommand != null) {
-            // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
-            Throwable exception = getExecutionException();
+        // guard by lock as the run command can be running concurrently in case hystrix caused a timeout which
+        // can cause the fallback timer to trigger this fallback at the same time the run command may be running
+        // after its processor.process method which could cause both threads to mutate the state on the exchange
+        synchronized (lock) {
+            fallbackInUse.set(true);
+        }
 
-            if (exception != null) {
-                LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+        if (fallback == null && fallbackCommand == null) {
+            // no fallback in use
+            throw new UnsupportedOperationException("No fallback available.");
+        }
+
+        // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+        Throwable exception = getExecutionException();
+
+        if (exception != null) {
+            LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+        } else {
+            LOG.debug("Error occurred processing. Will now run fallback.");
+        }
+        // store the last to endpoint as the failure endpoint
+        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+        }
+        // give the rest of the pipeline another chance
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+        exchange.removeProperty(Exchange.ROUTE_STOP);
+        exchange.setException(null);
+        // and we should not be regarded as exhausted as we are in a try .. catch block
+        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+        // run the fallback processor
+        try {
+            // use fallback command if provided (fallback via network)
+            if (fallbackCommand != null) {
+                return fallbackCommand.execute();
             } else {
-                LOG.debug("Error occurred processing. Will now run fallback.");
-            }
-            // store the last to endpoint as the failure endpoint
-            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
-            }
-            // give the rest of the pipeline another chance
-            exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
-            exchange.removeProperty(Exchange.ROUTE_STOP);
-            exchange.setException(null);
-            // and we should not be regarded as exhausted as we are in a try .. catch block
-            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-            // run the fallback processor
-            try {
-                // use fallback command if provided (fallback via network)
-                if (fallbackCommand != null) {
-                    return fallbackCommand.execute();
-                } else {
-                    LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
-                    // process the fallback until its fully done
-                    // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
-                    fallback.process(exchange);
-                    LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
+                LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
+                // process the fallback until its fully done
+                // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
+                fallback.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
             }
+        } catch (Exception e) {
+            exchange.setException(e);
         }
 
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends HystrixCommand {
     protected Message run() throws Exception {
         LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
 
+        // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
+        // in case Hystrix timeout processing and continue with the fallback etc
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
         try {
             // process the processor until its fully done
             // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
-            processor.process(exchange);
+            processor.process(copy);
         } catch (Exception e) {
-            exchange.setException(e);
+            copy.setException(e);
         }
 
-        // is fallback enabled
-        Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
-
-        // execution exception must take precedence over exchange exception
-        // because hystrix may have caused this command to fail due timeout or something else
-        Throwable exception = getExecutionException();
-        if (exception != null) {
-            exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+        // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+        // and therefore we need this thread to not do anymore if fallback is already in process
+        if (fallbackInUse.get()) {
+            LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+            return null;
         }
 
-        // if we failed then throw an exception if fallback is enabled
-        if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
-            throw exchange.getException();
-        }
+        // remember any hystrix execution exception which for example can be triggered by a hystrix timeout
+        Throwable cause = getExecutionException();
 
-        // no fallback then we are done
-        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
-        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        synchronized (lock) {
+
+            // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+            // and therefore we need this thread to not do anymore if fallback is already in process
+            if (fallbackInUse.get()) {
+                LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+                return null;
+            }
+
+            // and copy the result
+            ExchangeHelper.copyResults(exchange, copy);
+
+            // is fallback enabled
+            Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+
+            // execution exception must take precedence over exchange exception
+            // because hystrix may have caused this command to fail due timeout or something else
+            if (cause != null) {
+                exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, cause));
+            }
+
+            // if we have a fallback that can process the exchange in case of an exception
+            // then we need to trigger this by throwing an exception so Hystrix will execute the fallback
+            // if we don't have a fallback and an exception was thrown then its stored on the exchange
+            // and Camel will detect the exception anyway
+            if (fallback != null || fallbackCommand != null) {
+                if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
+                    // throwing exception will cause hystrix to execute fallback
+                    throw exchange.getException();
+                }
+            }
+
+            LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+            return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
new file mode 100644
index 0000000..b36203c
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout with Java DSL
+ */
+public class HystrixTimeoutTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        try {
+            template.requestBody("direct:start", "slow");
+            fail("Should fail due timeout");
+        } catch (Exception e) {
+            // expected a timeout
+            assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+        }
+    }
+
+    @Test
+    public void testSlowLoop() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        for (int i = 0; i < 10; i++) {
+            try {
+                log.info(">>> test run " + i + " <<<");
+                template.requestBody("direct:start", "slow");
+                fail("Should fail due timeout");
+            } catch (Exception e) {
+                // expected a timeout
+                assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+            }
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                        // use 2 second timeout
+                        .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
new file mode 100644
index 0000000..27790bb
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout and fallback with Java DSL
+ */
+public class HystrixTimeoutWithFallbackTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers the fallback
+        Object out = template.requestBody("direct:start", "slow");
+        assertEquals("Fallback response", out);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                    // use 2 second timeout
+                    .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .onFallback()
+                        // use fallback if there was an exception or timeout
+                        .log("Hystrix fallback start: ${threadName}")
+                        .transform().constant("Fallback response")
+                        .log("Hystrix fallback end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}


[4/4] camel git commit: CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.

Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/abc31903
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/abc31903
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/abc31903

Branch: refs/heads/camel-2.18.x
Commit: abc319031e09f793387fc5de1119d66a60bbf96d
Parents: 2b5ba0b
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 16:10:46 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:33 2016 +0100

----------------------------------------------------------------------
 .../processor/HystrixProcessorCommand.java      | 144 ++++++++++++-------
 .../hystrix/processor/HystrixTimeoutTest.java   |  97 +++++++++++++
 .../HystrixTimeoutWithFallbackTest.java         |  80 +++++++++++
 3 files changed, 272 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4d86ef7..511a46e 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -16,11 +16,14 @@
  */
 package org.apache.camel.component.hystrix.processor;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import com.netflix.hystrix.HystrixCommand;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
     private final Processor processor;
     private final Processor fallback;
     private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
+    private final AtomicBoolean fallbackInUse = new AtomicBoolean();
+    private final Object lock = new Object();
 
     public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback,
                                    HystrixProcessorCommandFallbackViaNetwork fallbackCommand) {
@@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand {
 
     @Override
     protected Message getFallback() {
-        if (fallback != null || fallbackCommand != null) {
-            // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
-            Throwable exception = getExecutionException();
+        // guard by lock as the run command can be running concurrently in case hystrix caused a timeout which
+        // can cause the fallback timer to trigger this fallback at the same time the run command may be running
+        // after its processor.process method which could cause both threads to mutate the state on the exchange
+        synchronized (lock) {
+            fallbackInUse.set(true);
+        }
 
-            if (exception != null) {
-                LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+        if (fallback == null && fallbackCommand == null) {
+            // no fallback in use
+            throw new UnsupportedOperationException("No fallback available.");
+        }
+
+        // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+        Throwable exception = getExecutionException();
+
+        if (exception != null) {
+            LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+        } else {
+            LOG.debug("Error occurred processing. Will now run fallback.");
+        }
+        // store the last to endpoint as the failure endpoint
+        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+        }
+        // give the rest of the pipeline another chance
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+        exchange.removeProperty(Exchange.ROUTE_STOP);
+        exchange.setException(null);
+        // and we should not be regarded as exhausted as we are in a try .. catch block
+        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+        // run the fallback processor
+        try {
+            // use fallback command if provided (fallback via network)
+            if (fallbackCommand != null) {
+                return fallbackCommand.execute();
             } else {
-                LOG.debug("Error occurred processing. Will now run fallback.");
-            }
-            // store the last to endpoint as the failure endpoint
-            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
-            }
-            // give the rest of the pipeline another chance
-            exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
-            exchange.removeProperty(Exchange.ROUTE_STOP);
-            exchange.setException(null);
-            // and we should not be regarded as exhausted as we are in a try .. catch block
-            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-            // run the fallback processor
-            try {
-                // use fallback command if provided (fallback via network)
-                if (fallbackCommand != null) {
-                    return fallbackCommand.execute();
-                } else {
-                    LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
-                    // process the fallback until its fully done
-                    // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
-                    fallback.process(exchange);
-                    LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
+                LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
+                // process the fallback until its fully done
+                // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
+                fallback.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
             }
+        } catch (Exception e) {
+            exchange.setException(e);
         }
 
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends HystrixCommand {
     protected Message run() throws Exception {
         LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
 
+        // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
+        // in case Hystrix timeout processing and continue with the fallback etc
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
         try {
             // process the processor until its fully done
             // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
-            processor.process(exchange);
+            processor.process(copy);
         } catch (Exception e) {
-            exchange.setException(e);
+            copy.setException(e);
         }
 
-        // is fallback enabled
-        Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
-
-        // execution exception must take precedence over exchange exception
-        // because hystrix may have caused this command to fail due timeout or something else
-        Throwable exception = getExecutionException();
-        if (exception != null) {
-            exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+        // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+        // and therefore we need this thread to not do anymore if fallback is already in process
+        if (fallbackInUse.get()) {
+            LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+            return null;
         }
 
-        // if we failed then throw an exception if fallback is enabled
-        if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
-            throw exchange.getException();
-        }
+        // remember any hystrix execution exception which for example can be triggered by a hystrix timeout
+        Throwable cause = getExecutionException();
 
-        // no fallback then we are done
-        LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
-        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        synchronized (lock) {
+
+            // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+            // and therefore we need this thread to not do anymore if fallback is already in process
+            if (fallbackInUse.get()) {
+                LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+                return null;
+            }
+
+            // and copy the result
+            ExchangeHelper.copyResults(exchange, copy);
+
+            // is fallback enabled
+            Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+
+            // execution exception must take precedence over exchange exception
+            // because hystrix may have caused this command to fail due timeout or something else
+            if (cause != null) {
+                exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, cause));
+            }
+
+            // if we have a fallback that can process the exchange in case of an exception
+            // then we need to trigger this by throwing an exception so Hystrix will execute the fallback
+            // if we don't have a fallback and an exception was thrown then its stored on the exchange
+            // and Camel will detect the exception anyway
+            if (fallback != null || fallbackCommand != null) {
+                if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
+                    // throwing exception will cause hystrix to execute fallback
+                    throw exchange.getException();
+                }
+            }
+
+            LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+            return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
new file mode 100644
index 0000000..b36203c
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout with Java DSL
+ */
+public class HystrixTimeoutTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        try {
+            template.requestBody("direct:start", "slow");
+            fail("Should fail due timeout");
+        } catch (Exception e) {
+            // expected a timeout
+            assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+        }
+    }
+
+    @Test
+    public void testSlowLoop() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        for (int i = 0; i < 10; i++) {
+            try {
+                log.info(">>> test run " + i + " <<<");
+                template.requestBody("direct:start", "slow");
+                fail("Should fail due timeout");
+            } catch (Exception e) {
+                // expected a timeout
+                assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+            }
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                        // use 2 second timeout
+                        .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
new file mode 100644
index 0000000..27790bb
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout and fallback with Java DSL
+ */
+public class HystrixTimeoutWithFallbackTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers the fallback
+        Object out = template.requestBody("direct:start", "slow");
+        assertEquals("Fallback response", out);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                    // use 2 second timeout
+                    .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .onFallback()
+                        // use fallback if there was an exception or timeout
+                        .log("Hystrix fallback start: ${threadName}")
+                        .transform().constant("Fallback response")
+                        .log("Hystrix fallback end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}