You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/17 09:59:54 UTC

[1/3] camel git commit: Add Apache License to classes

Repository: camel
Updated Branches:
  refs/heads/master a43dc750f -> 351375214


Add Apache License to classes

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72438fd8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72438fd8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72438fd8

Branch: refs/heads/master
Commit: 72438fd8140b531fb633e9d22760fffd8bcb59bb
Parents: 49bc730
Author: CodeSmell <mb...@gmail.com>
Authored: Mon Jan 16 10:24:22 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100

----------------------------------------------------------------------
 .../impl/ThrottingExceptionHalfOpenHandler.java | 26 ++++++++++++++
 .../impl/ThrottlingExceptionRoutePolicy.java    | 36 ++++++++++++++------
 ...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 22 ++++++++++--
 ...ExceptionRoutePolicyHalfOpenHandlerTest.java | 22 ++++++++++--
 ...rottingExceptionRoutePolicyHalfOpenTest.java | 22 ++++++++++--
 .../camel/processor/ThrottleException.java      | 16 +++++++++
 .../ThrottlingExceptionRoutePolicyTest.java     | 22 ++++++++++--
 7 files changed, 144 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
index 0b8019f..82cff2c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -1,5 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.impl;
 
+/**
+ * used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
+ * to handle the half open circuit state and how to determine if a route
+ * should be closed
+ *
+ */
 public interface ThrottingExceptionHalfOpenHandler {
+    /**
+     * check the state of the Camel route
+     * @return true to close the route and false to leave open
+     */
     boolean isReadyToBeClosed();
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 1647f85..04813b8 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -1,23 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.impl;
 
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
 import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.support.RoutePolicySupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
  * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
@@ -53,7 +68,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     // handler for half open circuit
     // can be used instead of resuming route
     // to check on resources
-    ThrottingExceptionHalfOpenHandler halfOpenHandler;
+    private ThrottingExceptionHalfOpenHandler halfOpenHandler;
 
     // stateful information
     private Timer halfOpenTimer;
@@ -266,7 +281,8 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     
     class HalfOpenTask extends TimerTask {
         private final Route route;
-        public HalfOpenTask(Route route) {
+        
+        HalfOpenTask(Route route) {
             this.route = route;
         }
         

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
index fa413f3..bace3a0 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
     private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
     

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
index bd19be8..b572888 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
     private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
     

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
index bc7432a..8f4b993 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -11,9 +30,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
     private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
     

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
index 7a648f2..e779907 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
 public class ThrottleException extends RuntimeException {

http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index aa550aa..91f4606 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
     private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
     


[3/3] camel git commit: Fixed typo in names. This closes #1400

Posted by da...@apache.org.
Fixed typo in names. This closes #1400


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/35137521
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/35137521
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/35137521

Branch: refs/heads/master
Commit: 351375214872adc7a16ac18839cf9a8552ad4dc8
Parents: 72438fd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 17 10:26:38 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:26:38 2017 +0100

----------------------------------------------------------------------
 .../impl/ThrottingExceptionHalfOpenHandler.java |  31 -----
 .../ThrottlingExceptionHalfOpenHandler.java     |  32 +++++
 .../impl/ThrottlingExceptionRoutePolicy.java    |  36 ++---
 ...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 -------------------
 ...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ------------------
 ...rottingExceptionRoutePolicyHalfOpenTest.java | 121 -----------------
 .../camel/processor/ThrottleException.java      |  38 ------
 .../camel/processor/ThrottlingException.java    |  38 ++++++
 ...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 +++++++++++++++++++
 ...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ++++++++++++++++++
 ...ottlingExceptionRoutePolicyHalfOpenTest.java | 121 +++++++++++++++++
 .../ThrottlingExceptionRoutePolicyTest.java     |   8 +-
 12 files changed, 477 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
deleted file mode 100644
index 82cff2c..0000000
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-/**
- * used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
- * to handle the half open circuit state and how to determine if a route
- * should be closed
- *
- */
-public interface ThrottingExceptionHalfOpenHandler {
-    /**
-     * check the state of the Camel route
-     * @return true to close the route and false to leave open
-     */
-    boolean isReadyToBeClosed();
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..d9d6269
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+/**
+ * Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
+ * to handle the half open circuit state and how to determine if a route
+ * should be closed
+ *
+ */
+public interface ThrottlingExceptionHalfOpenHandler {
+
+    /**
+     * Check the state of the Camel route
+     * @return true to close the route and false to leave open
+     */
+    boolean isReadyToBeClosed();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 04813b8..cda8018 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
  * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
  * thrown and the threshold setting. 
  * 
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
  * to determine if the processes that cause the route to be open are now available  
  */
 public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
-    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
     
     private static final int STATE_CLOSED = 0;
     private static final int STATE_HALF_OPEN = 1;
@@ -68,7 +68,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     // handler for half open circuit
     // can be used instead of resuming route
     // to check on resources
-    private ThrottingExceptionHalfOpenHandler halfOpenHandler;
+    private ThrottlingExceptionHalfOpenHandler halfOpenHandler;
 
     // stateful information
     private Timer halfOpenTimer;
@@ -96,7 +96,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
 
     @Override
     public void onInit(Route route) {
-        log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+        LOG.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
         logState();
     }
     
@@ -127,7 +127,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         boolean answer = false;
 
         if (exchange.getException() != null) {
-            log.debug("exception occured on route: checking to see if I handle that");
+            LOG.debug("exception occured on route: checking to see if I handle that");
             if (throttledExceptions == null || throttledExceptions.isEmpty()) {
                 // if no exceptions defined then always fail 
                 // (ie) assume we throttle on all exceptions
@@ -143,9 +143,9 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
             }
         }
 
-        if (log.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
-            log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+            LOG.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
         }
         return answer;
     }
@@ -157,31 +157,31 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         
         if (state.get() == STATE_CLOSED) {
             if (failureLimitReached) {
-                log.debug("opening circuit...");
+                LOG.debug("opening circuit...");
                 openCircuit(route);
             }
         } else if (state.get() == STATE_HALF_OPEN) {
             if (failureLimitReached) {
-                log.debug("opening circuit...");
+                LOG.debug("opening circuit...");
                 openCircuit(route);
             } else {
-                log.debug("closing circuit...");
+                LOG.debug("closing circuit...");
                 closeCircuit(route);
             }
         } else if (state.get() == STATE_OPEN) {
             long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
             if (halfOpenAfter <= elapsedTimeSinceOpened) {
-                log.debug("checking an open circuit...");
+                LOG.debug("checking an open circuit...");
                 if (halfOpenHandler != null) {
                     if (halfOpenHandler.isReadyToBeClosed()) {
-                        log.debug("closing circuit...");
+                        LOG.debug("closing circuit...");
                         closeCircuit(route);
                     } else {
-                        log.debug("opening circuit...");
+                        LOG.debug("opening circuit...");
                         openCircuit(route);
                     }
                 } else {
-                    log.debug("half opening circuit...");
+                    LOG.debug("half opening circuit...");
                     halfOpenCircuit(route);                    
                 }
             } 
@@ -254,8 +254,8 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     }
     
     private void logState() {
-        if (log.isDebugEnabled()) {
-            log.debug(dumpState());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(dumpState());
         }
     }
     
@@ -293,11 +293,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         }
     }
     
-    public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+    public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
         return halfOpenHandler;
     }
 
-    public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+    public void setHalfOpenHandler(ThrottlingExceptionHalfOpenHandler halfOpenHandler) {
         this.halfOpenHandler = halfOpenHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
deleted file mode 100644
index bace3a0..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
-    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
-    
-    private String url = "seda:foo?concurrentConsumers=20";
-    private MockEndpoint result;
-    
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        this.setUseRouteBuilder(true);
-        result = getMockEndpoint("mock:result");
-        
-        context.getShutdownStrategy().setTimeout(1);
-    }
-    
-    @Test
-    public void testHalfOpenCircuit() throws Exception {
-        result.expectedMessageCount(2);
-        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        result.whenAnyExchangeReceived(new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                String msg = exchange.getIn().getBody(String.class);
-                exchange.setException(new ThrottleException(msg));
-            }
-        });
-        
-        // send two messages which will fail
-        sendMessage("Message One");
-        sendMessage("Message Two");
-        
-        // wait long enough to 
-        // have the route shutdown
-        Thread.sleep(3000);
-        
-        // send more messages 
-        // but should get there (yet)
-        // due to open circuit
-        // SEDA will queue it up
-        log.debug("sending message three");
-        sendMessage("Message Three");
-
-        assertMockEndpointsSatisfied();
-        
-        result.reset();
-        result.expectedMessageCount(2);
-        bodies = Arrays.asList(new String[]{"Message Three", "Message Four"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        // wait long enough for
-        // half open attempt
-        Thread.sleep(4000);
-        
-        // send message
-        // should get through
-        log.debug("sending message four");
-        sendMessage("Message Four");
-        
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                int threshold = 2;
-                long failureWindow = 30;
-                long halfOpenAfter = 5000;
-                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
-                policy.setHalfOpenHandler(new AlwaysCloseHandler());
-                
-                from(url)
-                    .routePolicy(policy)
-                    .log("${body}")
-                    .to("log:foo?groupSize=10")
-                    .to("mock:result");
-            }
-        };
-    }
-    
-    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
-        @Override
-        public boolean isReadyToBeClosed() {
-            return true;
-        }
-        
-    }
-    
-    protected void sendMessage(String bodyText) {
-        try {
-            template.sendBody(url, bodyText);
-        } catch (Exception e) {
-            log.debug("Error sending:" + e.getCause().getMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
deleted file mode 100644
index b572888..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
-    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
-    
-    private String url = "direct:start";
-    private MockEndpoint result;
-    
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        this.setUseRouteBuilder(true);
-        result = getMockEndpoint("mock:result");
-        
-        context.getShutdownStrategy().setTimeout(1);
-    }
-    
-    @Test
-    public void testHalfOpenCircuit() throws Exception {
-        result.expectedMessageCount(2);
-        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        result.whenAnyExchangeReceived(new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                String msg = exchange.getIn().getBody(String.class);
-                exchange.setException(new ThrottleException(msg));
-            }
-        });
-        
-        // send two messages which will fail
-        sendMessage("Message One");
-        sendMessage("Message Two");
-        
-        // wait long enough to 
-        // have the route shutdown
-        Thread.sleep(3000);
-        
-        // send more messages 
-        // but never should get there
-        // due to open circuit
-        log.debug("sending message three");
-        sendMessage("Message Three");
-
-        assertMockEndpointsSatisfied();
-        
-        result.reset();
-        result.expectedMessageCount(1);
-        bodies = Arrays.asList(new String[]{"Message Four"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        // wait long enough for
-        // half open attempt
-        Thread.sleep(4000);
-        
-        // send message
-        // should get through
-        log.debug("sending message four");
-        sendMessage("Message Four");
-        
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                int threshold = 2;
-                long failureWindow = 30;
-                long halfOpenAfter = 5000;
-                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
-                policy.setHalfOpenHandler(new AlwaysCloseHandler());
-                
-                from(url)
-                    .routePolicy(policy)
-                    .log("${body}")
-                    .to("log:foo?groupSize=10")
-                    .to("mock:result");
-            }
-        };
-    }
-    
-    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
-        @Override
-        public boolean isReadyToBeClosed() {
-            return true;
-        }
-        
-    }
-    
-    protected void sendMessage(String bodyText) {
-        try {
-            template.sendBody(url, bodyText);
-        } catch (Exception e) {
-            log.debug("Error sending:" + e.getCause().getMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
deleted file mode 100644
index 8f4b993..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
-    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
-    
-    private String url = "direct:start";
-    private MockEndpoint result;
-    
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        this.setUseRouteBuilder(true);
-        result = getMockEndpoint("mock:result");
-        
-        context.getShutdownStrategy().setTimeout(1);
-    }
-    
-    @Test
-    public void testHalfOpenCircuit() throws Exception {
-        result.reset();
-        result.expectedMessageCount(2);
-        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        result.whenAnyExchangeReceived(new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                String msg = exchange.getIn().getBody(String.class);
-                exchange.setException(new ThrottleException(msg));
-            }
-        });
-        
-        // send two messages which will fail
-        sendMessage("Message One");
-        sendMessage("Message Two");
-        
-        // wait long enough to 
-        // have the route shutdown
-        Thread.sleep(3000);
-        
-        // send more messages 
-        // but never should get there
-        // due to open circuit
-        log.debug("sending message three");
-        sendMessage("Message Three");
-        
-        assertMockEndpointsSatisfied();
-        
-        result.reset();
-        result.expectedMessageCount(1);
-        bodies = Arrays.asList(new String[]{"Message Four"}); 
-        result.expectedBodiesReceivedInAnyOrder(bodies);
-        
-        // wait long enough for
-        // half open attempt
-        Thread.sleep(4000);
-        
-        // send message
-        // should get through
-        log.debug("sending message four");
-        sendMessage("Message Four");
-        
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                int threshold = 2;
-                long failureWindow = 30;
-                long halfOpenAfter = 5000;
-                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
-                
-                from(url)
-                    .routePolicy(policy)
-                    .to("log:foo?groupSize=10")
-                    .to("mock:result");
-            }
-        };
-    }
-    
-    protected void sendMessage(String bodyText) {
-        try {
-            template.sendBody(url, bodyText);
-        } catch (Exception e) {
-            log.debug("Error sending:" + e.getCause().getMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
deleted file mode 100644
index e779907..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-public class ThrottleException extends RuntimeException {
-
-    private static final long serialVersionUID = 1993185881371058773L;
-
-    public ThrottleException() {
-        super();
-    }
-
-    public ThrottleException(String message) {
-        super(message);
-    }
-
-    public ThrottleException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ThrottleException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
new file mode 100644
index 0000000..101e9d1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+public class ThrottlingException extends RuntimeException {
+
+    private static final long serialVersionUID = 1993185881371058773L;
+
+    public ThrottlingException() {
+        super();
+    }
+
+    public ThrottlingException(String message) {
+        super(message);
+    }
+
+    public ThrottlingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ThrottlingException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..625989a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+    
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottlingException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but should get there (yet)
+        // due to open circuit
+        // SEDA will queue it up
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(2);
+        bodies = Arrays.asList(new String[]{"Message Three", "Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..6684b6e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottlingException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..82c4f5e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.reset();
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottlingException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+        
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                
+                from(url)
+                    .routePolicy(policy)
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index 91f4606..a2e982a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -24,7 +24,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
 import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,7 +47,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
         context.getShutdownStrategy().setTimeout(1);
     }
 
-    
     @Test
     public void testThrottlingRoutePolicyClosed() throws Exception {
         result.expectedMinimumMessageCount(size);
@@ -60,7 +59,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
-    
     @Test
     public void testOpenCircuitToPreventMessageThree() throws Exception {
         result.reset();
@@ -72,7 +70,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
             @Override
             public void process(Exchange exchange) throws Exception {
                 String msg = exchange.getIn().getBody(String.class);
-                exchange.setException(new ThrottleException(msg));
+                exchange.setException(new ThrottlingException(msg));
             }
         });
         
@@ -115,7 +113,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
         };
     }
     
-    public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+    public class NeverCloseHandler implements ThrottlingExceptionHalfOpenHandler {
 
         @Override
         public boolean isReadyToBeClosed() {


[2/3] camel git commit: Throttling on Exceptions (RoutePolicy)

Posted by da...@apache.org.
Throttling on Exceptions (RoutePolicy)

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49bc7305
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49bc7305
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49bc7305

Branch: refs/heads/master
Commit: 49bc73052d0b7e37cfc6fc2067966f64c75aff27
Parents: a43dc75
Author: CodeSmell <mb...@gmail.com>
Authored: Fri Jan 13 10:28:25 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100

----------------------------------------------------------------------
 .../impl/ThrottingExceptionHalfOpenHandler.java |   5 +
 .../impl/ThrottlingExceptionRoutePolicy.java    | 288 +++++++++++++++++++
 ...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 117 ++++++++
 ...ExceptionRoutePolicyHalfOpenHandlerTest.java | 116 ++++++++
 ...rottingExceptionRoutePolicyHalfOpenTest.java | 105 +++++++
 .../camel/processor/ThrottleException.java      |  22 ++
 .../ThrottlingExceptionRoutePolicyTest.java     | 110 +++++++
 7 files changed, 763 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..0b8019f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -0,0 +1,5 @@
+package org.apache.camel.impl;
+
+public interface ThrottingExceptionHalfOpenHandler {
+    boolean isReadyToBeClosed();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
new file mode 100644
index 0000000..1647f85
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -0,0 +1,288 @@
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
+ * thrown and the threshold setting. 
+ * 
+ * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. 
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move 
+ * to a half open state and attempt to determine if the consumer can be started.
+ * There are two ways to determine if a route can be closed after being opened
+ * (1) start the consumer and check the failure threshold
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler} 
+ * The second option allows a custom check to be performed without having to take on the possibiliy of 
+ * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
+ * to determine if the processes that cause the route to be open are now available  
+ */
+public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+    
+    private static final int STATE_CLOSED = 0;
+    private static final int STATE_HALF_OPEN = 1;
+    private static final int STATE_OPEN = 2;
+    
+    private CamelContext camelContext;
+    private final Lock lock = new ReentrantLock();
+    
+    // configuration
+    private int failureThreshold;
+    private long failureWindow;
+    private long halfOpenAfter;
+    private final List<Class<?>> throttledExceptions;
+    
+    // handler for half open circuit
+    // can be used instead of resuming route
+    // to check on resources
+    ThrottingExceptionHalfOpenHandler halfOpenHandler;
+
+    // stateful information
+    private Timer halfOpenTimer;
+    private AtomicInteger failures = new AtomicInteger();
+    private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+    private long lastFailure;
+    private long openedAt;
+    
+    public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
+        this.throttledExceptions = handledExceptions;
+        this.failureWindow = failureWindow;
+        this.halfOpenAfter = halfOpenAfter;
+        this.failureThreshold = threshold;
+    }
+    
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+        logState();
+    }
+    
+    @Override
+    public void onExchangeDone(Route route, Exchange exchange) {
+        if (hasFailed(exchange)) {
+            // record the failure
+            failures.incrementAndGet();
+            lastFailure = System.currentTimeMillis();
+        } 
+        
+        // check for state change
+        calculateState(route);
+    }
+    
+    /**
+     * uses similar approach as {@link CircuitBreakerLoadBalancer}
+     * if the exchange has an exception that we are watching 
+     * then we count that as a failure otherwise we ignore it
+     * @param exchange
+     * @return
+     */
+    private boolean hasFailed(Exchange exchange) {
+        if (exchange == null) {
+            return false;
+        }
+
+        boolean answer = false;
+
+        if (exchange.getException() != null) {
+            log.debug("exception occured on route: checking to see if I handle that");
+            if (throttledExceptions == null || throttledExceptions.isEmpty()) {
+                // if no exceptions defined then always fail 
+                // (ie) assume we throttle on all exceptions
+                answer = true;
+            } else {
+                for (Class<?> exception : throttledExceptions) {
+                    // will look in exception hierarchy
+                    if (exchange.getException(exception) != null) {
+                        answer = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
+            log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+        }
+        return answer;
+    }
+
+    private void calculateState(Route route) {
+        
+        // have we reached the failure limit?
+        boolean failureLimitReached = isThresholdExceeded();
+        
+        if (state.get() == STATE_CLOSED) {
+            if (failureLimitReached) {
+                log.debug("opening circuit...");
+                openCircuit(route);
+            }
+        } else if (state.get() == STATE_HALF_OPEN) {
+            if (failureLimitReached) {
+                log.debug("opening circuit...");
+                openCircuit(route);
+            } else {
+                log.debug("closing circuit...");
+                closeCircuit(route);
+            }
+        } else if (state.get() == STATE_OPEN) {
+            long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+            if (halfOpenAfter <= elapsedTimeSinceOpened) {
+                log.debug("checking an open circuit...");
+                if (halfOpenHandler != null) {
+                    if (halfOpenHandler.isReadyToBeClosed()) {
+                        log.debug("closing circuit...");
+                        closeCircuit(route);
+                    } else {
+                        log.debug("opening circuit...");
+                        openCircuit(route);
+                    }
+                } else {
+                    log.debug("half opening circuit...");
+                    halfOpenCircuit(route);                    
+                }
+            } 
+        }
+        
+    }
+    
+    protected boolean isThresholdExceeded() {
+        boolean output = false;
+        logState();
+        // failures exceed the threshold 
+        // AND the last of those failures occurred within window
+        if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
+            output = true;
+        }
+        
+        return output;
+    }
+        
+    protected void openCircuit(Route route) {
+        try {
+            lock.lock();
+            stopConsumer(route.getConsumer());
+            state.set(STATE_OPEN);
+            openedAt = System.currentTimeMillis();
+            halfOpenTimer = new Timer();
+            halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    protected void halfOpenCircuit(Route route) {
+        try {
+            lock.lock();
+            startConsumer(route.getConsumer());
+            state.set(STATE_HALF_OPEN);
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    protected void closeCircuit(Route route) {
+        try {
+            lock.lock();
+            startConsumer(route.getConsumer());
+            this.reset();
+            logState();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    /**
+     * reset the route 
+     */
+    private void reset() {
+        failures.set(0);
+        lastFailure = 0;
+        openedAt = 0;
+        state.set(STATE_CLOSED);
+    }
+    
+    private void logState() {
+        if (log.isDebugEnabled()) {
+            log.debug(dumpState());
+        }
+    }
+    
+    public String dumpState() {
+        int num = state.get();
+        String state = stateAsString(num);
+        if (failures.get() > 0) {
+            return String.format("*** State %s, failures %d, last failure %d ms ago", state, failures.get(), System.currentTimeMillis() - lastFailure);
+        } else {
+            return String.format("*** State %s, failures %d", state, failures.get());
+        }
+    }
+    
+    private static String stateAsString(int num) {
+        if (num == STATE_CLOSED) {
+            return "closed";
+        } else if (num == STATE_HALF_OPEN) {
+            return "half opened";
+        } else {
+            return "opened";
+        }
+    }
+    
+    class HalfOpenTask extends TimerTask {
+        private final Route route;
+        public HalfOpenTask(Route route) {
+            this.route = route;
+        }
+        
+        @Override
+        public void run() {
+            calculateState(route);
+            halfOpenTimer.cancel();
+        }
+    }
+    
+    public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+        return halfOpenHandler;
+    }
+
+    public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+        this.halfOpenHandler = halfOpenHandler;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..fa413f3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,117 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+    
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but should get there (yet)
+        // due to open circuit
+        // SEDA will queue it up
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(2);
+        bodies = Arrays.asList(new String[]{"Message Three", "Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..bd19be8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,116 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new AlwaysCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return true;
+        }
+        
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..bc7432a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
+    
+    private String url = "direct:start";
+    private MockEndpoint result;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+    
+    @Test
+    public void testHalfOpenCircuit() throws Exception {
+        result.reset();
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        sendMessage("Message One");
+        sendMessage("Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        sendMessage("Message Three");
+        
+        assertMockEndpointsSatisfied();
+        
+        result.reset();
+        result.expectedMessageCount(1);
+        bodies = Arrays.asList(new String[]{"Message Four"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        // wait long enough for
+        // half open attempt
+        Thread.sleep(4000);
+        
+        // send message
+        // should get through
+        log.debug("sending message four");
+        sendMessage("Message Four");
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                
+                from(url)
+                    .routePolicy(policy)
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    protected void sendMessage(String bodyText) {
+        try {
+            template.sendBody(url, bodyText);
+        } catch (Exception e) {
+            log.debug("Error sending:" + e.getCause().getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
new file mode 100644
index 0000000..7a648f2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -0,0 +1,22 @@
+package org.apache.camel.processor;
+
+public class ThrottleException extends RuntimeException {
+
+    private static final long serialVersionUID = 1993185881371058773L;
+
+    public ThrottleException() {
+        super();
+    }
+
+    public ThrottleException(String message) {
+        super(message);
+    }
+
+    public ThrottleException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ThrottleException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
new file mode 100644
index 0000000..aa550aa
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -0,0 +1,110 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
+    private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
+    
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    private int size = 100;
+    
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        
+        context.getShutdownStrategy().setTimeout(1);
+    }
+
+    
+    @Test
+    public void testThrottlingRoutePolicyClosed() throws Exception {
+        result.expectedMinimumMessageCount(size);
+
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "Message " + i);
+            Thread.sleep(3);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    
+    @Test
+    public void testOpenCircuitToPreventMessageThree() throws Exception {
+        result.reset();
+        result.expectedMessageCount(2);
+        List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"}); 
+        result.expectedBodiesReceivedInAnyOrder(bodies);
+        
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String msg = exchange.getIn().getBody(String.class);
+                exchange.setException(new ThrottleException(msg));
+            }
+        });
+        
+        // send two messages which will fail
+        template.sendBody(url, "Message One");
+        template.sendBody(url, "Message Two");
+        
+        // wait long enough to 
+        // have the route shutdown
+        Thread.sleep(3000);
+        
+        // send more messages 
+        // but never should get there
+        // due to open circuit
+        log.debug("sending message three");
+        template.sendBody(url, "Message Three");
+        
+        Thread.sleep(2000);
+        
+        assertMockEndpointsSatisfied();
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 5000;
+                ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+                policy.setHalfOpenHandler(new NeverCloseHandler());
+                
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+        @Override
+        public boolean isReadyToBeClosed() {
+            return false;
+        }
+        
+    }
+}