You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/17 09:59:55 UTC

[2/3] camel git commit: Throttling on Exceptions (RoutePolicy)

Throttling on Exceptions (RoutePolicy)

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49bc7305
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49bc7305
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49bc7305

Branch: refs/heads/master
Commit: 49bc73052d0b7e37cfc6fc2067966f64c75aff27
Parents: a43dc75
Author: CodeSmell <mb...@gmail.com>
Authored: Fri Jan 13 10:28:25 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100

----------------------------------------------------------------------
 .../impl/ThrottingExceptionHalfOpenHandler.java |   5 +
 .../impl/ThrottlingExceptionRoutePolicy.java    | 288 +++++++++++++++++++
 ...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 117 ++++++++
 ...ExceptionRoutePolicyHalfOpenHandlerTest.java | 116 ++++++++
 ...rottingExceptionRoutePolicyHalfOpenTest.java | 105 +++++++
 .../camel/processor/ThrottleException.java      |  22 ++
 .../ThrottlingExceptionRoutePolicyTest.java     | 110 +++++++
 7 files changed, 763 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..0b8019f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -0,0 +1,5 @@
+package org.apache.camel.impl;
+
+public interface ThrottingExceptionHalfOpenHandler {
+    boolean isReadyToBeClosed();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
new file mode 100644
index 0000000..1647f85
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -0,0 +1,288 @@
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
+ * thrown and the threshold setting. 
+ * 
+ * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. 
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move 
+ * to a half open state and attempt to determine if the consumer can be started.
+ * There are two ways to determine if a route can be closed after being opened
+ * (1) start the consumer and check the failure threshold
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler} 
+ * The second option allows a custom check to be performed without having to take on the possibiliy of 
+ * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
+ * to determine if the processes that cause the route to be open are now available  
+ */
+public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+    
+    private static final int STATE_CLOSED = 0;
+    private static final int STATE_HALF_OPEN = 1;
+    private static final int STATE_OPEN = 2;
+    
+    private CamelContext camelContext;
+    private final Lock lock = new ReentrantLock();
+    
+    // configuration
+    private int failureThreshold;
+    private long failureWindow;
+    private long halfOpenAfter;
+    private final List<Class<?>> throttledExceptions;
+    
+    // handler for half open circuit
+    // can be used instead of resuming route
+    // to check on resources
+    ThrottingExceptionHalfOpenHandler halfOpenHandler;
+
+    // stateful information
+    private Timer halfOpenTimer;
+    private AtomicInteger failures = new AtomicInteger();
+    private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+    private long lastFailure;
+    private long openedAt;
+    
+    public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
+        this.throttledExceptions = handledExceptions;
+        this.failureWindow = failureWindow;
+        this.halfOpenAfter = halfOpenAfter;
+        this.failureThreshold = threshold;
+    }
+    
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+        logState();
+    }
+    
+    @Override
+    public void onExchangeDone(Route route, Exchange exchange) {
+        if (hasFailed(exchange)) {
+            // record the failure
+            failures.incrementAndGet();
+            lastFailure = System.currentTimeMillis();
+        } 
+        
+        // check for state change
+        calculateState(route);
+    }
+    
+    /**
+     * uses similar approach as {@link CircuitBreakerLoadBalancer}
+     * if the exchange has an exception that we are watching 
+     * then we count that as a failure otherwise we ignore it
+     * @param exchange
+     * @return
+     */
+    private boolean hasFailed(Exchange exchange) {
+        if (exchange == null) {
+            return false;
+        }
+
+        boolean answer = false;
+
+        if (exchange.getException() != null) {
+            log.debug("exception occured on route: checking to see if I handle that");
+            if (throttledExceptions == null || throttledExceptions.isEmpty()) {
+                // if no exceptions defined then always fail 
+                // (ie) assume we throttle on all exceptions
+                answer = true;
+            } else {
+                for (Class<?> exception : throttledExceptions) {
+                    // will look in exception hierarchy
+                    if (exchange.getException(exception) != null) {
+                        answer = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
+            log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+        }
+        return answer;
+    }
+
+    private void calculateState(Route route) {
+        
+        // have we reached the failure limit?
+        boolean failureLimitReached = isThresholdExceeded();
+        
+        if (state.get() == STATE_CLOSED) {
+            if (failureLimitReached) {
+                log.debug("opening circuit...");
+                openCircuit(route);
+            }
+        } else if (state.get() == STATE_HALF_OPEN) {
+            if (failureLimitReached) {
+                log.debug("opening circuit...");
+                openCircuit(route);
+            } else {
+                log.debug("closing circuit...");
+                closeCircuit(route);
+            }
+        } else if (state.get() == STATE_OPEN) {
+            long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+            if (halfOpenAfter <= elapsedTimeSinceOpened) {
+                log.debug("checking an open circuit...");
+                if (halfOpenHandler != null) {
+                    if (halfOpenHandler.isReadyToBeClosed()) {
+                        log.debug("closing circuit...");
+                        closeCircuit(route);
+                    } else {
+                        log.debug("opening circuit...");
+                        openCircuit(route);
+                    }
+                } else {
+                    log.debug("half opening circuit...");
+                    halfOpenCircuit(route);                    
+                }
+            } 
+        }
+        
+    }
+    
+    protected boolean isThresholdExceeded() {
+        boolean output = false;
+        logState();
+        // failures exceed the threshold 
+        // AND the last of those failures occurred within window
+        if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
+            output = true;
+        }
+        
+        return output;
+    }
+        
+    protected void openCircuit(Route route) {
+        try {
+            lock.lock();
+            stopConsumer(route.getConsumer());
+            state.set(STATE_OPEN);
+            openedAt = System.currentTimeMillis();
+            halfOpenTimer = new Timer();
+            halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    protected void halfOpenCircuit(Route route) {
+        try {
+            lock.lock();
+            startConsumer(route.getConsumer());
+            state.set(STATE_HALF_OPEN);
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    protected void closeCircuit(Route route) {
+        try {
+            lock.lock();
+            startConsumer(route.getConsumer());
+            this.reset();
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    /**
+     * reset the route 
+     */
+    private void reset() {
+        failures.set(0);
+        lastFailure = 0;
+        openedAt = 0;
+        state.set(STATE_CLOSED);
+    }
+    
+    private void logState() {
+        if (log.isDebugEnabled()) {
+            log.debug(dumpState());
+        }
+    }
+    
+    public String dumpState() {
+        int num = state.get();
+        String state = stateAsString(num);
+        if (failures.get() > 0) {
+            return String.format("*** State %s, failures %d, last failure %d ms ago", state, failures.get(), System.currentTimeMillis() - lastFailure);
+        } else {
+            return String.format("*** State %s, failures %d", state, failures.get());
+        }
+    }
+    
+    private static String stateAsString(int num) {
+        if (num == STATE_CLOSED) {
+            return "closed";
+        } else if (num == STATE_HALF_OPEN) {
+            return "half opened";
+        } else {
+            return "opened";
+        }
+    }
+    
+    class HalfOpenTask extends TimerTask {
+        private final Route route;
+        public HalfOpenTask(Route route) {
+            this.route = route;
+        }
+        
+        @Override
+        public void run() {
+            calculateState(route);
+            halfOpenTimer.cancel();
+        }
+    }
+    
+    public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+        return halfOpenHandler;
+    }
+
+    public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+        this.halfOpenHandler = halfOpenHandler;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..fa413f3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,117 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+    
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but should get there (yet)
+        // due to open circuit
+        // SEDA will queue it up
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(2);
+        bodies = Arrays.asList(new String[]{"Message Three", "Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..bd19be8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,116 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..bc7432a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.reset();
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+        
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                
+                from(url)
+                    .routePolicy(policy)
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
new file mode 100644
index 0000000..7a648f2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -0,0 +1,22 @@
+package org.apache.camel.processor;
+
+public class ThrottleException extends RuntimeException {
+
+    private static final long serialVersionUID = 1993185881371058773L;
+
+    public ThrottleException() {
+        super();
+    }
+
+    public ThrottleException(String message) {
+        super(message);
+    }
+
+    public ThrottleException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ThrottleException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
new file mode 100644
index 0000000..aa550aa
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -0,0 +1,110 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
+    
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    private int size = 100;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+
+    
+    @Test
+    public void testThrottlingRoutePolicyClosed() throws Exception {
+        result.expectedMinimumMessageCount(size);
+
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "Message " + i);
+            Thread.sleep(3);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    
+    @Test
+    public void testOpenCircuitToPreventMessageThree() throws Exception {
+        result.reset();
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        template.sendBody(url, "Message One");
+        template.sendBody(url, "Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        template.sendBody(url, "Message Three");
+        
+        Thread.sleep(2000);
+        
+        assertMockEndpointsSatisfied();
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new NeverCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return false;
+        }
+        
+    }
+}