You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/29 16:55:30 UTC
[1/4] camel git commit: CAMEL-10662: camel-hystrix - If hystrix
timeout occurs then the hystrix timeout exception should be cause
Repository: camel
Updated Branches:
refs/heads/camel-2.18.x 36e481749 -> abc319031
refs/heads/master 28dcd4340 -> 906a612d3
CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2b5ba0bf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2b5ba0bf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2b5ba0bf
Branch: refs/heads/camel-2.18.x
Commit: 2b5ba0bfb33b3f1475cf79cb2be569725ef15d3c
Parents: 36e4817
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 14:16:59 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 14:17:24 2016 +0100
----------------------------------------------------------------------
.../hystrix/processor/HystrixProcessorCommand.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2b5ba0bf/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4face6e..4d86ef7 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.hystrix.processor;
import com.netflix.hystrix.HystrixCommand;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
@@ -45,10 +46,10 @@ public class HystrixProcessorCommand extends HystrixCommand {
@Override
protected Message getFallback() {
- // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
- Throwable exception = getExecutionException();
-
if (fallback != null || fallbackCommand != null) {
+ // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+ Throwable exception = getExecutionException();
+
if (exception != null) {
LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
} else {
@@ -100,13 +101,20 @@ public class HystrixProcessorCommand extends HystrixCommand {
// is fallback enabled
Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+ // execution exception must take precedence over exchange exception
+ // because hystrix may have caused this command to fail due timeout or something else
+ Throwable exception = getExecutionException();
+ if (exception != null) {
+ exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+ }
+
// if we failed then throw an exception if fallback is enabled
if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
throw exchange.getException();
}
- LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
// no fallback then we are done
+ LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
}
}
[3/4] camel git commit: CAMEL-10662: camel-hystrix - If hystrix
timeout occurs then the hystrix timeout exception should be cause
Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - If hystrix timeout occurs then the hystrix timeout exception should be cause
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5807f211
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5807f211
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5807f211
Branch: refs/heads/master
Commit: 5807f21123093abf98472321340f59cfca293b54
Parents: 28dcd43
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 14:16:59 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:07 2016 +0100
----------------------------------------------------------------------
.../hystrix/processor/HystrixProcessorCommand.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5807f211/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4face6e..4d86ef7 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.hystrix.processor;
import com.netflix.hystrix.HystrixCommand;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
@@ -45,10 +46,10 @@ public class HystrixProcessorCommand extends HystrixCommand {
@Override
protected Message getFallback() {
- // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
- Throwable exception = getExecutionException();
-
if (fallback != null || fallbackCommand != null) {
+ // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+ Throwable exception = getExecutionException();
+
if (exception != null) {
LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
} else {
@@ -100,13 +101,20 @@ public class HystrixProcessorCommand extends HystrixCommand {
// is fallback enabled
Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+ // execution exception must take precedence over exchange exception
+ // because hystrix may have caused this command to fail due timeout or something else
+ Throwable exception = getExecutionException();
+ if (exception != null) {
+ exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+ }
+
// if we failed then throw an exception if fallback is enabled
if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
throw exchange.getException();
}
- LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
// no fallback then we are done
+ LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
}
}
[2/4] camel git commit: CAMEL-10662: camel-hystrix - thread race when
hystrix timeout triggers then fallback can run concurrently with run. Added
some timeout related tests.
Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/906a612d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/906a612d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/906a612d
Branch: refs/heads/master
Commit: 906a612d3b59c4a36ad53d084b8cef3ba608cdc4
Parents: 5807f21
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 16:10:46 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:07 2016 +0100
----------------------------------------------------------------------
.../processor/HystrixProcessorCommand.java | 144 ++++++++++++-------
.../hystrix/processor/HystrixTimeoutTest.java | 97 +++++++++++++
.../HystrixTimeoutWithFallbackTest.java | 80 +++++++++++
3 files changed, 272 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4d86ef7..511a46e 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -16,11 +16,14 @@
*/
package org.apache.camel.component.hystrix.processor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import com.netflix.hystrix.HystrixCommand;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
private final Processor processor;
private final Processor fallback;
private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
+ private final AtomicBoolean fallbackInUse = new AtomicBoolean();
+ private final Object lock = new Object();
public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback,
HystrixProcessorCommandFallbackViaNetwork fallbackCommand) {
@@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand {
@Override
protected Message getFallback() {
- if (fallback != null || fallbackCommand != null) {
- // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
- Throwable exception = getExecutionException();
+ // guard by lock as the run command can be running concurrently in case hystrix caused a timeout which
+ // can cause the fallback timer to trigger this fallback at the same time the run command may be running
+ // after its processor.process method which could cause both threads to mutate the state on the exchange
+ synchronized (lock) {
+ fallbackInUse.set(true);
+ }
- if (exception != null) {
- LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+ if (fallback == null && fallbackCommand == null) {
+ // no fallback in use
+ throw new UnsupportedOperationException("No fallback available.");
+ }
+
+ // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+ Throwable exception = getExecutionException();
+
+ if (exception != null) {
+ LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+ } else {
+ LOG.debug("Error occurred processing. Will now run fallback.");
+ }
+ // store the last to endpoint as the failure endpoint
+ if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
+ // give the rest of the pipeline another chance
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+ exchange.removeProperty(Exchange.ROUTE_STOP);
+ exchange.setException(null);
+ // and we should not be regarded as exhausted as we are in a try .. catch block
+ exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ // run the fallback processor
+ try {
+ // use fallback command if provided (fallback via network)
+ if (fallbackCommand != null) {
+ return fallbackCommand.execute();
} else {
- LOG.debug("Error occurred processing. Will now run fallback.");
- }
- // store the last to endpoint as the failure endpoint
- if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- }
- // give the rest of the pipeline another chance
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
- exchange.removeProperty(Exchange.ROUTE_STOP);
- exchange.setException(null);
- // and we should not be regarded as exhausted as we are in a try .. catch block
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
- // run the fallback processor
- try {
- // use fallback command if provided (fallback via network)
- if (fallbackCommand != null) {
- return fallbackCommand.execute();
- } else {
- LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
- // process the fallback until its fully done
- // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
- fallback.process(exchange);
- LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
- }
- } catch (Exception e) {
- exchange.setException(e);
+ LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
+ // process the fallback until its fully done
+ // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
+ fallback.process(exchange);
+ LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
}
+ } catch (Exception e) {
+ exchange.setException(e);
}
return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends HystrixCommand {
protected Message run() throws Exception {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
+ // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
+ // in case Hystrix timeout processing and continue with the fallback etc
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
try {
// process the processor until its fully done
// (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
- processor.process(exchange);
+ processor.process(copy);
} catch (Exception e) {
- exchange.setException(e);
+ copy.setException(e);
}
- // is fallback enabled
- Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
-
- // execution exception must take precedence over exchange exception
- // because hystrix may have caused this command to fail due timeout or something else
- Throwable exception = getExecutionException();
- if (exception != null) {
- exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+ // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+ // and therefore we need this thread to not do anymore if fallback is already in process
+ if (fallbackInUse.get()) {
+ LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+ return null;
}
- // if we failed then throw an exception if fallback is enabled
- if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
- throw exchange.getException();
- }
+ // remember any hystrix execution exception which for example can be triggered by a hystrix timeout
+ Throwable cause = getExecutionException();
- // no fallback then we are done
- LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
- return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ synchronized (lock) {
+
+ // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+ // and therefore we need this thread to not do anymore if fallback is already in process
+ if (fallbackInUse.get()) {
+ LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+ return null;
+ }
+
+ // and copy the result
+ ExchangeHelper.copyResults(exchange, copy);
+
+ // is fallback enabled
+ Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+
+ // execution exception must take precedence over exchange exception
+ // because hystrix may have caused this command to fail due timeout or something else
+ if (cause != null) {
+ exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, cause));
+ }
+
+ // if we have a fallback that can process the exchange in case of an exception
+ // then we need to trigger this by throwing an exception so Hystrix will execute the fallback
+ // if we don't have a fallback and an exception was thrown then its stored on the exchange
+ // and Camel will detect the exception anyway
+ if (fallback != null || fallbackCommand != null) {
+ if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
+ // throwing exception will cause hystrix to execute fallback
+ throw exchange.getException();
+ }
+ }
+
+ LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+ return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
new file mode 100644
index 0000000..b36203c
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout with Java DSL
+ */
+public class HystrixTimeoutTest extends CamelTestSupport {
+
+ @Test
+ public void testFast() throws Exception {
+ // this calls the fast route and therefore we get a response
+ Object out = template.requestBody("direct:start", "fast");
+ assertEquals("Fast response", out);
+ }
+
+ @Test
+ public void testSlow() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ try {
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+ }
+ }
+
+ @Test
+ public void testSlowLoop() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ for (int i = 0; i < 10; i++) {
+ try {
+ log.info(">>> test run " + i + " <<<");
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+ }
+ }
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .hystrix()
+ // use 2 second timeout
+ .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+ .log("Hystrix processing start: ${threadName}")
+ .toD("direct:${body}")
+ .log("Hystrix processing end: ${threadName}")
+ .end()
+ .log("After Hystrix ${body}");
+
+ from("direct:fast")
+ // this is a fast route and takes 1 second to respond
+ .log("Fast processing start: ${threadName}")
+ .delay(1000)
+ .transform().constant("Fast response")
+ .log("Fast processing end: ${threadName}");
+
+ from("direct:slow")
+ // this is a slow route and takes 3 second to respond
+ .log("Slow processing start: ${threadName}")
+ .delay(3000)
+ .transform().constant("Slow response")
+ .log("Slow processing end: ${threadName}");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
new file mode 100644
index 0000000..27790bb
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout and fallback with Java DSL
+ */
+public class HystrixTimeoutWithFallbackTest extends CamelTestSupport {
+
+ @Test
+ public void testFast() throws Exception {
+ // this calls the fast route and therefore we get a response
+ Object out = template.requestBody("direct:start", "fast");
+ assertEquals("Fast response", out);
+ }
+
+ @Test
+ public void testSlow() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers the fallback
+ Object out = template.requestBody("direct:start", "slow");
+ assertEquals("Fallback response", out);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .hystrix()
+ // use 2 second timeout
+ .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+ .log("Hystrix processing start: ${threadName}")
+ .toD("direct:${body}")
+ .log("Hystrix processing end: ${threadName}")
+ .onFallback()
+ // use fallback if there was an exception or timeout
+ .log("Hystrix fallback start: ${threadName}")
+ .transform().constant("Fallback response")
+ .log("Hystrix fallback end: ${threadName}")
+ .end()
+ .log("After Hystrix ${body}");
+
+ from("direct:fast")
+ // this is a fast route and takes 1 second to respond
+ .log("Fast processing start: ${threadName}")
+ .delay(1000)
+ .transform().constant("Fast response")
+ .log("Fast processing end: ${threadName}");
+
+ from("direct:slow")
+ // this is a slow route and takes 3 second to respond
+ .log("Slow processing start: ${threadName}")
+ .delay(3000)
+ .transform().constant("Slow response")
+ .log("Slow processing end: ${threadName}");
+ }
+ };
+ }
+
+}
[4/4] camel git commit: CAMEL-10662: camel-hystrix - thread race when
hystrix timeout triggers then fallback can run concurrently with run. Added
some timeout related tests.
Posted by da...@apache.org.
CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/abc31903
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/abc31903
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/abc31903
Branch: refs/heads/camel-2.18.x
Commit: abc319031e09f793387fc5de1119d66a60bbf96d
Parents: 2b5ba0b
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 29 16:10:46 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 29 17:54:33 2016 +0100
----------------------------------------------------------------------
.../processor/HystrixProcessorCommand.java | 144 ++++++++++++-------
.../hystrix/processor/HystrixTimeoutTest.java | 97 +++++++++++++
.../HystrixTimeoutWithFallbackTest.java | 80 +++++++++++
3 files changed, 272 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4d86ef7..511a46e 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -16,11 +16,14 @@
*/
package org.apache.camel.component.hystrix.processor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import com.netflix.hystrix.HystrixCommand;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
private final Processor processor;
private final Processor fallback;
private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
+ private final AtomicBoolean fallbackInUse = new AtomicBoolean();
+ private final Object lock = new Object();
public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback,
HystrixProcessorCommandFallbackViaNetwork fallbackCommand) {
@@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand {
@Override
protected Message getFallback() {
- if (fallback != null || fallbackCommand != null) {
- // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
- Throwable exception = getExecutionException();
+ // guard by lock as the run command can be running concurrently in case hystrix caused a timeout which
+ // can cause the fallback timer to trigger this fallback at the same time the run command may be running
+ // after its processor.process method which could cause both threads to mutate the state on the exchange
+ synchronized (lock) {
+ fallbackInUse.set(true);
+ }
- if (exception != null) {
- LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+ if (fallback == null && fallbackCommand == null) {
+ // no fallback in use
+ throw new UnsupportedOperationException("No fallback available.");
+ }
+
+ // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited)
+ Throwable exception = getExecutionException();
+
+ if (exception != null) {
+ LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
+ } else {
+ LOG.debug("Error occurred processing. Will now run fallback.");
+ }
+ // store the last to endpoint as the failure endpoint
+ if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
+ // give the rest of the pipeline another chance
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+ exchange.removeProperty(Exchange.ROUTE_STOP);
+ exchange.setException(null);
+ // and we should not be regarded as exhausted as we are in a try .. catch block
+ exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ // run the fallback processor
+ try {
+ // use fallback command if provided (fallback via network)
+ if (fallbackCommand != null) {
+ return fallbackCommand.execute();
} else {
- LOG.debug("Error occurred processing. Will now run fallback.");
- }
- // store the last to endpoint as the failure endpoint
- if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- }
- // give the rest of the pipeline another chance
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
- exchange.removeProperty(Exchange.ROUTE_STOP);
- exchange.setException(null);
- // and we should not be regarded as exhausted as we are in a try .. catch block
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
- // run the fallback processor
- try {
- // use fallback command if provided (fallback via network)
- if (fallbackCommand != null) {
- return fallbackCommand.execute();
- } else {
- LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
- // process the fallback until its fully done
- // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
- fallback.process(exchange);
- LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
- }
- } catch (Exception e) {
- exchange.setException(e);
+ LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
+ // process the fallback until its fully done
+ // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
+ fallback.process(exchange);
+ LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
}
+ } catch (Exception e) {
+ exchange.setException(e);
}
return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends HystrixCommand {
protected Message run() throws Exception {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
+ // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
+ // in case Hystrix timeout processing and continue with the fallback etc
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
try {
// process the processor until its fully done
// (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
- processor.process(exchange);
+ processor.process(copy);
} catch (Exception e) {
- exchange.setException(e);
+ copy.setException(e);
}
- // is fallback enabled
- Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
-
- // execution exception must take precedence over exchange exception
- // because hystrix may have caused this command to fail due timeout or something else
- Throwable exception = getExecutionException();
- if (exception != null) {
- exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception));
+ // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+ // and therefore we need this thread to not do anymore if fallback is already in process
+ if (fallbackInUse.get()) {
+ LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+ return null;
}
- // if we failed then throw an exception if fallback is enabled
- if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
- throw exchange.getException();
- }
+ // remember any hystrix execution exception which for example can be triggered by a hystrix timeout
+ Throwable cause = getExecutionException();
- // no fallback then we are done
- LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
- return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ synchronized (lock) {
+
+ // when a hystrix timeout occurs then a hystrix timer thread executes the fallback
+ // and therefore we need this thread to not do anymore if fallback is already in process
+ if (fallbackInUse.get()) {
+ LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+ return null;
+ }
+
+ // and copy the result
+ ExchangeHelper.copyResults(exchange, copy);
+
+ // is fallback enabled
+ Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+
+ // execution exception must take precedence over exchange exception
+ // because hystrix may have caused this command to fail due timeout or something else
+ if (cause != null) {
+ exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, cause));
+ }
+
+ // if we have a fallback that can process the exchange in case of an exception
+ // then we need to trigger this by throwing an exception so Hystrix will execute the fallback
+ // if we don't have a fallback and an exception was thrown then its stored on the exchange
+ // and Camel will detect the exception anyway
+ if (fallback != null || fallbackCommand != null) {
+ if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) {
+ // throwing exception will cause hystrix to execute fallback
+ throw exchange.getException();
+ }
+ }
+
+ LOG.debug("Running processor: {} with exchange: {} done", processor, exchange);
+ return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
new file mode 100644
index 0000000..b36203c
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout with Java DSL
+ */
+public class HystrixTimeoutTest extends CamelTestSupport {
+
+ @Test
+ public void testFast() throws Exception {
+ // this calls the fast route and therefore we get a response
+ Object out = template.requestBody("direct:start", "fast");
+ assertEquals("Fast response", out);
+ }
+
+ @Test
+ public void testSlow() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ try {
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+ }
+ }
+
+ @Test
+ public void testSlowLoop() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ for (int i = 0; i < 10; i++) {
+ try {
+ log.info(">>> test run " + i + " <<<");
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause().getCause());
+ }
+ }
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .hystrix()
+ // use 2 second timeout
+ .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+ .log("Hystrix processing start: ${threadName}")
+ .toD("direct:${body}")
+ .log("Hystrix processing end: ${threadName}")
+ .end()
+ .log("After Hystrix ${body}");
+
+ from("direct:fast")
+ // this is a fast route and takes 1 second to respond
+ .log("Fast processing start: ${threadName}")
+ .delay(1000)
+ .transform().constant("Fast response")
+ .log("Fast processing end: ${threadName}");
+
+ from("direct:slow")
+ // this is a slow route and takes 3 second to respond
+ .log("Slow processing start: ${threadName}")
+ .delay(3000)
+ .transform().constant("Slow response")
+ .log("Slow processing end: ${threadName}");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/abc31903/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
new file mode 100644
index 0000000..27790bb
--- /dev/null
+++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout and fallback with Java DSL
+ */
+public class HystrixTimeoutWithFallbackTest extends CamelTestSupport {
+
+ @Test
+ public void testFast() throws Exception {
+ // this calls the fast route and therefore we get a response
+ Object out = template.requestBody("direct:start", "fast");
+ assertEquals("Fast response", out);
+ }
+
+ @Test
+ public void testSlow() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers the fallback
+ Object out = template.requestBody("direct:start", "slow");
+ assertEquals("Fallback response", out);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .hystrix()
+ // use 2 second timeout
+ .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+ .log("Hystrix processing start: ${threadName}")
+ .toD("direct:${body}")
+ .log("Hystrix processing end: ${threadName}")
+ .onFallback()
+ // use fallback if there was an exception or timeout
+ .log("Hystrix fallback start: ${threadName}")
+ .transform().constant("Fallback response")
+ .log("Hystrix fallback end: ${threadName}")
+ .end()
+ .log("After Hystrix ${body}");
+
+ from("direct:fast")
+ // this is a fast route and takes 1 second to respond
+ .log("Fast processing start: ${threadName}")
+ .delay(1000)
+ .transform().constant("Fast response")
+ .log("Fast processing end: ${threadName}");
+
+ from("direct:slow")
+ // this is a slow route and takes 3 second to respond
+ .log("Slow processing start: ${threadName}")
+ .delay(3000)
+ .transform().constant("Slow response")
+ .log("Slow processing end: ${threadName}");
+ }
+ };
+ }
+
+}