You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/17 09:59:55 UTC
[2/3] camel git commit: Throttling on Exceptions (RoutePolicy)
Throttling on Exceptions (RoutePolicy)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49bc7305
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49bc7305
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49bc7305
Branch: refs/heads/master
Commit: 49bc73052d0b7e37cfc6fc2067966f64c75aff27
Parents: a43dc75
Author: CodeSmell <mb...@gmail.com>
Authored: Fri Jan 13 10:28:25 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100
----------------------------------------------------------------------
.../impl/ThrottingExceptionHalfOpenHandler.java | 5 +
.../impl/ThrottlingExceptionRoutePolicy.java | 288 +++++++++++++++++++
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 117 ++++++++
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 116 ++++++++
...rottingExceptionRoutePolicyHalfOpenTest.java | 105 +++++++
.../camel/processor/ThrottleException.java | 22 ++
.../ThrottlingExceptionRoutePolicyTest.java | 110 +++++++
7 files changed, 763 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..0b8019f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -0,0 +1,5 @@
+package org.apache.camel.impl;
+
+public interface ThrottingExceptionHalfOpenHandler {
+ boolean isReadyToBeClosed();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
new file mode 100644
index 0000000..1647f85
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -0,0 +1,288 @@
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
+ * thrown and the threshold setting.
+ *
+ * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
+ * to a half open state and attempt to determine if the consumer can be started.
+ * There are two ways to determine if a route can be closed after being opened
+ * (1) start the consumer and check the failure threshold
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
+ * The second option allows a custom check to be performed without having to take on the possibiliy of
+ * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
+ * to determine if the processes that cause the route to be open are now available
+ */
+public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+
+ private static final int STATE_CLOSED = 0;
+ private static final int STATE_HALF_OPEN = 1;
+ private static final int STATE_OPEN = 2;
+
+ private CamelContext camelContext;
+ private final Lock lock = new ReentrantLock();
+
+ // configuration
+ private int failureThreshold;
+ private long failureWindow;
+ private long halfOpenAfter;
+ private final List<Class<?>> throttledExceptions;
+
+ // handler for half open circuit
+ // can be used instead of resuming route
+ // to check on resources
+ ThrottingExceptionHalfOpenHandler halfOpenHandler;
+
+ // stateful information
+ private Timer halfOpenTimer;
+ private AtomicInteger failures = new AtomicInteger();
+ private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+ private long lastFailure;
+ private long openedAt;
+
+ public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
+ this.throttledExceptions = handledExceptions;
+ this.failureWindow = failureWindow;
+ this.halfOpenAfter = halfOpenAfter;
+ this.failureThreshold = threshold;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+ logState();
+ }
+
+ @Override
+ public void onExchangeDone(Route route, Exchange exchange) {
+ if (hasFailed(exchange)) {
+ // record the failure
+ failures.incrementAndGet();
+ lastFailure = System.currentTimeMillis();
+ }
+
+ // check for state change
+ calculateState(route);
+ }
+
+ /**
+ * uses similar approach as {@link CircuitBreakerLoadBalancer}
+ * if the exchange has an exception that we are watching
+ * then we count that as a failure otherwise we ignore it
+ * @param exchange
+ * @return
+ */
+ private boolean hasFailed(Exchange exchange) {
+ if (exchange == null) {
+ return false;
+ }
+
+ boolean answer = false;
+
+ if (exchange.getException() != null) {
+ log.debug("exception occured on route: checking to see if I handle that");
+ if (throttledExceptions == null || throttledExceptions.isEmpty()) {
+ // if no exceptions defined then always fail
+ // (ie) assume we throttle on all exceptions
+ answer = true;
+ } else {
+ for (Class<?> exception : throttledExceptions) {
+ // will look in exception hierarchy
+ if (exchange.getException(exception) != null) {
+ answer = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
+ log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+ }
+ return answer;
+ }
+
+ private void calculateState(Route route) {
+
+ // have we reached the failure limit?
+ boolean failureLimitReached = isThresholdExceeded();
+
+ if (state.get() == STATE_CLOSED) {
+ if (failureLimitReached) {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ }
+ } else if (state.get() == STATE_HALF_OPEN) {
+ if (failureLimitReached) {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ } else {
+ log.debug("closing circuit...");
+ closeCircuit(route);
+ }
+ } else if (state.get() == STATE_OPEN) {
+ long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+ if (halfOpenAfter <= elapsedTimeSinceOpened) {
+ log.debug("checking an open circuit...");
+ if (halfOpenHandler != null) {
+ if (halfOpenHandler.isReadyToBeClosed()) {
+ log.debug("closing circuit...");
+ closeCircuit(route);
+ } else {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ }
+ } else {
+ log.debug("half opening circuit...");
+ halfOpenCircuit(route);
+ }
+ }
+ }
+
+ }
+
+ protected boolean isThresholdExceeded() {
+ boolean output = false;
+ logState();
+ // failures exceed the threshold
+ // AND the last of those failures occurred within window
+ if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
+ output = true;
+ }
+
+ return output;
+ }
+
+ protected void openCircuit(Route route) {
+ try {
+ lock.lock();
+ stopConsumer(route.getConsumer());
+ state.set(STATE_OPEN);
+ openedAt = System.currentTimeMillis();
+ halfOpenTimer = new Timer();
+ halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected void halfOpenCircuit(Route route) {
+ try {
+ lock.lock();
+ startConsumer(route.getConsumer());
+ state.set(STATE_HALF_OPEN);
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected void closeCircuit(Route route) {
+ try {
+ lock.lock();
+ startConsumer(route.getConsumer());
+ this.reset();
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * reset the route
+ */
+ private void reset() {
+ failures.set(0);
+ lastFailure = 0;
+ openedAt = 0;
+ state.set(STATE_CLOSED);
+ }
+
+ private void logState() {
+ if (log.isDebugEnabled()) {
+ log.debug(dumpState());
+ }
+ }
+
+ public String dumpState() {
+ int num = state.get();
+ String state = stateAsString(num);
+ if (failures.get() > 0) {
+ return String.format("*** State %s, failures %d, last failure %d ms ago", state, failures.get(), System.currentTimeMillis() - lastFailure);
+ } else {
+ return String.format("*** State %s, failures %d", state, failures.get());
+ }
+ }
+
+ private static String stateAsString(int num) {
+ if (num == STATE_CLOSED) {
+ return "closed";
+ } else if (num == STATE_HALF_OPEN) {
+ return "half opened";
+ } else {
+ return "opened";
+ }
+ }
+
+ class HalfOpenTask extends TimerTask {
+ private final Route route;
+ public HalfOpenTask(Route route) {
+ this.route = route;
+ }
+
+ @Override
+ public void run() {
+ calculateState(route);
+ halfOpenTimer.cancel();
+ }
+ }
+
+ public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+ return halfOpenHandler;
+ }
+
+ public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+ this.halfOpenHandler = halfOpenHandler;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..fa413f3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,117 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but should get there (yet)
+ // due to open circuit
+ // SEDA will queue it up
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(2);
+ bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..bd19be8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,116 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..bc7432a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+ from(url)
+ .routePolicy(policy)
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
new file mode 100644
index 0000000..7a648f2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -0,0 +1,22 @@
+package org.apache.camel.processor;
+
+public class ThrottleException extends RuntimeException {
+
+ private static final long serialVersionUID = 1993185881371058773L;
+
+ public ThrottleException() {
+ super();
+ }
+
+ public ThrottleException(String message) {
+ super(message);
+ }
+
+ public ThrottleException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ThrottleException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
new file mode 100644
index 0000000..aa550aa
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -0,0 +1,110 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+ private int size = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+
+ @Test
+ public void testThrottlingRoutePolicyClosed() throws Exception {
+ result.expectedMinimumMessageCount(size);
+
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "Message " + i);
+ Thread.sleep(3);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+ @Test
+ public void testOpenCircuitToPreventMessageThree() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ template.sendBody(url, "Message One");
+ template.sendBody(url, "Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ template.sendBody(url, "Message Three");
+
+ Thread.sleep(2000);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new NeverCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return false;
+ }
+
+ }
+}