You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/17 09:59:54 UTC
[1/3] camel git commit: Add Apache License to classes
Repository: camel
Updated Branches:
refs/heads/master a43dc750f -> 351375214
Add Apache License to classes
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72438fd8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72438fd8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72438fd8
Branch: refs/heads/master
Commit: 72438fd8140b531fb633e9d22760fffd8bcb59bb
Parents: 49bc730
Author: CodeSmell <mb...@gmail.com>
Authored: Mon Jan 16 10:24:22 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100
----------------------------------------------------------------------
.../impl/ThrottingExceptionHalfOpenHandler.java | 26 ++++++++++++++
.../impl/ThrottlingExceptionRoutePolicy.java | 36 ++++++++++++++------
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 22 ++++++++++--
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 22 ++++++++++--
...rottingExceptionRoutePolicyHalfOpenTest.java | 22 ++++++++++--
.../camel/processor/ThrottleException.java | 16 +++++++++
.../ThrottlingExceptionRoutePolicyTest.java | 22 ++++++++++--
7 files changed, 144 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
index 0b8019f..82cff2c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -1,5 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.impl;
+/**
+ * used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
+ * to handle the half open circuit state and how to determine if a route
+ * should be closed
+ *
+ */
public interface ThrottingExceptionHalfOpenHandler {
+ /**
+ * check the state of the Camel route
+ * @return true to close the route and false to leave open
+ */
boolean isReadyToBeClosed();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 1647f85..04813b8 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -1,23 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.impl;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.support.RoutePolicySupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
* modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
* this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
@@ -53,7 +68,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
// handler for half open circuit
// can be used instead of resuming route
// to check on resources
- ThrottingExceptionHalfOpenHandler halfOpenHandler;
+ private ThrottingExceptionHalfOpenHandler halfOpenHandler;
// stateful information
private Timer halfOpenTimer;
@@ -266,7 +281,8 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
class HalfOpenTask extends TimerTask {
private final Route route;
- public HalfOpenTask(Route route) {
+
+ HalfOpenTask(Route route) {
this.route = route;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
index fa413f3..bace3a0 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.processor;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-
public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
index bd19be8..b572888 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.processor;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-
public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
index bc7432a..8f4b993 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.processor;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -11,9 +30,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-
public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
index 7a648f2..e779907 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.processor;
public class ThrottleException extends RuntimeException {
http://git-wip-us.apache.org/repos/asf/camel/blob/72438fd8/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index aa550aa..91f4606 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -1,5 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.processor;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -12,9 +31,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-
public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
[3/3] camel git commit: Fixed typo in names. This closes #1400
Posted by da...@apache.org.
Fixed typo in names. This closes #1400
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/35137521
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/35137521
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/35137521
Branch: refs/heads/master
Commit: 351375214872adc7a16ac18839cf9a8552ad4dc8
Parents: 72438fd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 17 10:26:38 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:26:38 2017 +0100
----------------------------------------------------------------------
.../impl/ThrottingExceptionHalfOpenHandler.java | 31 -----
.../ThrottlingExceptionHalfOpenHandler.java | 32 +++++
.../impl/ThrottlingExceptionRoutePolicy.java | 36 ++---
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 -------------------
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ------------------
...rottingExceptionRoutePolicyHalfOpenTest.java | 121 -----------------
.../camel/processor/ThrottleException.java | 38 ------
.../camel/processor/ThrottlingException.java | 38 ++++++
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 +++++++++++++++++++
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ++++++++++++++++++
...ottlingExceptionRoutePolicyHalfOpenTest.java | 121 +++++++++++++++++
.../ThrottlingExceptionRoutePolicyTest.java | 8 +-
12 files changed, 477 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
deleted file mode 100644
index 82cff2c..0000000
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-/**
- * used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
- * to handle the half open circuit state and how to determine if a route
- * should be closed
- *
- */
-public interface ThrottingExceptionHalfOpenHandler {
- /**
- * check the state of the Camel route
- * @return true to close the route and false to leave open
- */
- boolean isReadyToBeClosed();
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..d9d6269
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+/**
+ * Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
+ * to handle the half open circuit state and how to determine if a route
+ * should be closed
+ *
+ */
+public interface ThrottlingExceptionHalfOpenHandler {
+
+ /**
+ * Check the state of the Camel route
+ * @return true to close the route and false to leave open
+ */
+ boolean isReadyToBeClosed();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 04813b8..cda8018 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
* this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
* thrown and the threshold setting.
*
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
* to determine if the processes that cause the route to be open are now available
*/
public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
- private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
private static final int STATE_CLOSED = 0;
private static final int STATE_HALF_OPEN = 1;
@@ -68,7 +68,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
// handler for half open circuit
// can be used instead of resuming route
// to check on resources
- private ThrottingExceptionHalfOpenHandler halfOpenHandler;
+ private ThrottlingExceptionHalfOpenHandler halfOpenHandler;
// stateful information
private Timer halfOpenTimer;
@@ -96,7 +96,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
@Override
public void onInit(Route route) {
- log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+ LOG.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
logState();
}
@@ -127,7 +127,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
boolean answer = false;
if (exchange.getException() != null) {
- log.debug("exception occured on route: checking to see if I handle that");
+ LOG.debug("exception occured on route: checking to see if I handle that");
if (throttledExceptions == null || throttledExceptions.isEmpty()) {
// if no exceptions defined then always fail
// (ie) assume we throttle on all exceptions
@@ -143,9 +143,9 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
}
- if (log.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
- log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+ LOG.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
}
return answer;
}
@@ -157,31 +157,31 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
if (state.get() == STATE_CLOSED) {
if (failureLimitReached) {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
}
} else if (state.get() == STATE_HALF_OPEN) {
if (failureLimitReached) {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
} else {
- log.debug("closing circuit...");
+ LOG.debug("closing circuit...");
closeCircuit(route);
}
} else if (state.get() == STATE_OPEN) {
long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
if (halfOpenAfter <= elapsedTimeSinceOpened) {
- log.debug("checking an open circuit...");
+ LOG.debug("checking an open circuit...");
if (halfOpenHandler != null) {
if (halfOpenHandler.isReadyToBeClosed()) {
- log.debug("closing circuit...");
+ LOG.debug("closing circuit...");
closeCircuit(route);
} else {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
}
} else {
- log.debug("half opening circuit...");
+ LOG.debug("half opening circuit...");
halfOpenCircuit(route);
}
}
@@ -254,8 +254,8 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
private void logState() {
- if (log.isDebugEnabled()) {
- log.debug(dumpState());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(dumpState());
}
}
@@ -293,11 +293,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
}
- public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+ public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
return halfOpenHandler;
}
- public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+ public void setHalfOpenHandler(ThrottlingExceptionHalfOpenHandler halfOpenHandler) {
this.halfOpenHandler = halfOpenHandler;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
deleted file mode 100644
index bace3a0..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
-
- private String url = "seda:foo?concurrentConsumers=20";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but should get there (yet)
- // due to open circuit
- // SEDA will queue it up
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(2);
- bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
- policy.setHalfOpenHandler(new AlwaysCloseHandler());
-
- from(url)
- .routePolicy(policy)
- .log("${body}")
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
- @Override
- public boolean isReadyToBeClosed() {
- return true;
- }
-
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
deleted file mode 100644
index b572888..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
-
- private String url = "direct:start";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but never should get there
- // due to open circuit
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(1);
- bodies = Arrays.asList(new String[]{"Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
- policy.setHalfOpenHandler(new AlwaysCloseHandler());
-
- from(url)
- .routePolicy(policy)
- .log("${body}")
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
- @Override
- public boolean isReadyToBeClosed() {
- return true;
- }
-
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
deleted file mode 100644
index 8f4b993..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
-
- private String url = "direct:start";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.reset();
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but never should get there
- // due to open circuit
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(1);
- bodies = Arrays.asList(new String[]{"Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
-
- from(url)
- .routePolicy(policy)
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
deleted file mode 100644
index e779907..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-public class ThrottleException extends RuntimeException {
-
- private static final long serialVersionUID = 1993185881371058773L;
-
- public ThrottleException() {
- super();
- }
-
- public ThrottleException(String message) {
- super(message);
- }
-
- public ThrottleException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ThrottleException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
new file mode 100644
index 0000000..101e9d1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+public class ThrottlingException extends RuntimeException {
+
+ private static final long serialVersionUID = 1993185881371058773L;
+
+ public ThrottlingException() {
+ super();
+ }
+
+ public ThrottlingException(String message) {
+ super(message);
+ }
+
+ public ThrottlingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ThrottlingException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..625989a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but should get there (yet)
+ // due to open circuit
+ // SEDA will queue it up
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(2);
+ bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..6684b6e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..82c4f5e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+ from(url)
+ .routePolicy(policy)
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index 91f4606..a2e982a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -24,7 +24,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +47,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
context.getShutdownStrategy().setTimeout(1);
}
-
@Test
public void testThrottlingRoutePolicyClosed() throws Exception {
result.expectedMinimumMessageCount(size);
@@ -60,7 +59,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
-
@Test
public void testOpenCircuitToPreventMessageThree() throws Exception {
result.reset();
@@ -72,7 +70,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
@Override
public void process(Exchange exchange) throws Exception {
String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
+ exchange.setException(new ThrottlingException(msg));
}
});
@@ -115,7 +113,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
};
}
- public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+ public class NeverCloseHandler implements ThrottlingExceptionHalfOpenHandler {
@Override
public boolean isReadyToBeClosed() {
[2/3] camel git commit: Throttling on Exceptions (RoutePolicy)
Posted by da...@apache.org.
Throttling on Exceptions (RoutePolicy)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49bc7305
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49bc7305
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49bc7305
Branch: refs/heads/master
Commit: 49bc73052d0b7e37cfc6fc2067966f64c75aff27
Parents: a43dc75
Author: CodeSmell <mb...@gmail.com>
Authored: Fri Jan 13 10:28:25 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:18:15 2017 +0100
----------------------------------------------------------------------
.../impl/ThrottingExceptionHalfOpenHandler.java | 5 +
.../impl/ThrottlingExceptionRoutePolicy.java | 288 +++++++++++++++++++
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 117 ++++++++
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 116 ++++++++
...rottingExceptionRoutePolicyHalfOpenTest.java | 105 +++++++
.../camel/processor/ThrottleException.java | 22 ++
.../ThrottlingExceptionRoutePolicyTest.java | 110 +++++++
7 files changed, 763 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..0b8019f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
@@ -0,0 +1,5 @@
+package org.apache.camel.impl;
+
+public interface ThrottingExceptionHalfOpenHandler {
+ boolean isReadyToBeClosed();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
new file mode 100644
index 0000000..1647f85
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -0,0 +1,288 @@
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
+ * thrown and the threshold setting.
+ *
+ * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
+ * to a half open state and attempt to determine if the consumer can be started.
+ * There are two ways to determine if a route can be closed after being opened
+ * (1) start the consumer and check the failure threshold
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
+ * The second option allows a custom check to be performed without having to take on the possibiliy of
+ * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
+ * to determine if the processes that cause the route to be open are now available
+ */
+public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+
+ private static final int STATE_CLOSED = 0;
+ private static final int STATE_HALF_OPEN = 1;
+ private static final int STATE_OPEN = 2;
+
+ private CamelContext camelContext;
+ private final Lock lock = new ReentrantLock();
+
+ // configuration
+ private int failureThreshold;
+ private long failureWindow;
+ private long halfOpenAfter;
+ private final List<Class<?>> throttledExceptions;
+
+ // handler for half open circuit
+ // can be used instead of resuming route
+ // to check on resources
+ ThrottingExceptionHalfOpenHandler halfOpenHandler;
+
+ // stateful information
+ private Timer halfOpenTimer;
+ private AtomicInteger failures = new AtomicInteger();
+ private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+ private long lastFailure;
+ private long openedAt;
+
+ public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
+ this.throttledExceptions = handledExceptions;
+ this.failureWindow = failureWindow;
+ this.halfOpenAfter = halfOpenAfter;
+ this.failureThreshold = threshold;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+ logState();
+ }
+
+ @Override
+ public void onExchangeDone(Route route, Exchange exchange) {
+ if (hasFailed(exchange)) {
+ // record the failure
+ failures.incrementAndGet();
+ lastFailure = System.currentTimeMillis();
+ }
+
+ // check for state change
+ calculateState(route);
+ }
+
+ /**
+ * uses similar approach as {@link CircuitBreakerLoadBalancer}
+ * if the exchange has an exception that we are watching
+ * then we count that as a failure otherwise we ignore it
+ * @param exchange
+ * @return
+ */
+ private boolean hasFailed(Exchange exchange) {
+ if (exchange == null) {
+ return false;
+ }
+
+ boolean answer = false;
+
+ if (exchange.getException() != null) {
+ log.debug("exception occured on route: checking to see if I handle that");
+ if (throttledExceptions == null || throttledExceptions.isEmpty()) {
+ // if no exceptions defined then always fail
+ // (ie) assume we throttle on all exceptions
+ answer = true;
+ } else {
+ for (Class<?> exception : throttledExceptions) {
+ // will look in exception hierarchy
+ if (exchange.getException(exception) != null) {
+ answer = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
+ log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+ }
+ return answer;
+ }
+
+ private void calculateState(Route route) {
+
+ // have we reached the failure limit?
+ boolean failureLimitReached = isThresholdExceeded();
+
+ if (state.get() == STATE_CLOSED) {
+ if (failureLimitReached) {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ }
+ } else if (state.get() == STATE_HALF_OPEN) {
+ if (failureLimitReached) {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ } else {
+ log.debug("closing circuit...");
+ closeCircuit(route);
+ }
+ } else if (state.get() == STATE_OPEN) {
+ long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+ if (halfOpenAfter <= elapsedTimeSinceOpened) {
+ log.debug("checking an open circuit...");
+ if (halfOpenHandler != null) {
+ if (halfOpenHandler.isReadyToBeClosed()) {
+ log.debug("closing circuit...");
+ closeCircuit(route);
+ } else {
+ log.debug("opening circuit...");
+ openCircuit(route);
+ }
+ } else {
+ log.debug("half opening circuit...");
+ halfOpenCircuit(route);
+ }
+ }
+ }
+
+ }
+
+ protected boolean isThresholdExceeded() {
+ boolean output = false;
+ logState();
+ // failures exceed the threshold
+ // AND the last of those failures occurred within window
+ if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
+ output = true;
+ }
+
+ return output;
+ }
+
+ protected void openCircuit(Route route) {
+ try {
+ lock.lock();
+ stopConsumer(route.getConsumer());
+ state.set(STATE_OPEN);
+ openedAt = System.currentTimeMillis();
+ halfOpenTimer = new Timer();
+ halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected void halfOpenCircuit(Route route) {
+ try {
+ lock.lock();
+ startConsumer(route.getConsumer());
+ state.set(STATE_HALF_OPEN);
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected void closeCircuit(Route route) {
+ try {
+ lock.lock();
+ startConsumer(route.getConsumer());
+ this.reset();
+ logState();
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * reset the route
+ */
+ private void reset() {
+ failures.set(0);
+ lastFailure = 0;
+ openedAt = 0;
+ state.set(STATE_CLOSED);
+ }
+
+ private void logState() {
+ if (log.isDebugEnabled()) {
+ log.debug(dumpState());
+ }
+ }
+
+ public String dumpState() {
+ int num = state.get();
+ String state = stateAsString(num);
+ if (failures.get() > 0) {
+ return String.format("*** State %s, failures %d, last failure %d ms ago", state, failures.get(), System.currentTimeMillis() - lastFailure);
+ } else {
+ return String.format("*** State %s, failures %d", state, failures.get());
+ }
+ }
+
+ private static String stateAsString(int num) {
+ if (num == STATE_CLOSED) {
+ return "closed";
+ } else if (num == STATE_HALF_OPEN) {
+ return "half opened";
+ } else {
+ return "opened";
+ }
+ }
+
+ class HalfOpenTask extends TimerTask {
+ private final Route route;
+ public HalfOpenTask(Route route) {
+ this.route = route;
+ }
+
+ @Override
+ public void run() {
+ calculateState(route);
+ halfOpenTimer.cancel();
+ }
+ }
+
+ public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+ return halfOpenHandler;
+ }
+
+ public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+ this.halfOpenHandler = halfOpenHandler;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..fa413f3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,117 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but should get there (yet)
+ // due to open circuit
+ // SEDA will queue it up
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(2);
+ bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..bd19be8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,116 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..bc7432a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+ from(url)
+ .routePolicy(policy)
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
new file mode 100644
index 0000000..7a648f2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
@@ -0,0 +1,22 @@
+package org.apache.camel.processor;
+
+public class ThrottleException extends RuntimeException {
+
+ private static final long serialVersionUID = 1993185881371058773L;
+
+ public ThrottleException() {
+ super();
+ }
+
+ public ThrottleException(String message) {
+ super(message);
+ }
+
+ public ThrottleException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ThrottleException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/49bc7305/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
new file mode 100644
index 0000000..aa550aa
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -0,0 +1,110 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+ private int size = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+
+ @Test
+ public void testThrottlingRoutePolicyClosed() throws Exception {
+ result.expectedMinimumMessageCount(size);
+
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "Message " + i);
+ Thread.sleep(3);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+ @Test
+ public void testOpenCircuitToPreventMessageThree() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottleException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ template.sendBody(url, "Message One");
+ template.sendBody(url, "Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ template.sendBody(url, "Message Three");
+
+ Thread.sleep(2000);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new NeverCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return false;
+ }
+
+ }
+}