You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/10 12:07:31 UTC
[camel] 01/02: [CAMEL-12125] add keepOpen to endpoint circuit
breaker
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9cc0eedfd62be8809db4c9c97ad42eddda121d0f
Author: CodeSmell <mb...@gmail.com>
AuthorDate: Thu Jan 4 17:57:13 2018 -0500
[CAMEL-12125] add keepOpen to endpoint circuit breaker
---
.../camel/impl/ThrottlingExceptionRoutePolicy.java | 168 +++++++++++++--------
...lingExceptionRoutePolicyKeepOpenOnInitTest.java | 98 ++++++++++++
...tlingExceptionRoutePolicyOpenViaConfigTest.java | 104 +++++++++++++
3 files changed, 309 insertions(+), 61 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 92aba77..0ee7b83 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -16,13 +16,6 @@
*/
package org.apache.camel.impl;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
@@ -33,38 +26,46 @@ import org.apache.camel.support.RoutePolicySupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
* this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
- * thrown and the threshold setting.
- *
+ * thrown and the threshold setting.
+ *
* the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
- * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
- * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
* to a half open state and attempt to determine if the consumer can be started.
* There are two ways to determine if a route can be closed after being opened
* (1) start the consumer and check the failure threshold
- * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
* The second option allows a custom check to be performed without having to take on the possibility of
* multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
- * to determine if the processes that cause the route to be open are now available
+ * to determine if the processes that cause the route to be open are now available
*/
public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
-
+
private static final int STATE_CLOSED = 0;
private static final int STATE_HALF_OPEN = 1;
private static final int STATE_OPEN = 2;
-
+
private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
-
+
// configuration
private int failureThreshold;
private long failureWindow;
private long halfOpenAfter;
private final List<Class<?>> throttledExceptions;
-
+
// handler for half open circuit
// can be used instead of resuming route
// to check on resources
@@ -73,17 +74,27 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
// stateful information
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+ private AtomicBoolean keepOpen = new AtomicBoolean(false);
private volatile Timer halfOpenTimer;
private volatile long lastFailure;
private volatile long openedAt;
-
+
public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
this.throttledExceptions = handledExceptions;
this.failureWindow = failureWindow;
this.halfOpenAfter = halfOpenAfter;
this.failureThreshold = threshold;
+ this.keepOpen.set(false);
+ }
+
+ public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) {
+ this.throttledExceptions = handledExceptions;
+ this.failureWindow = failureWindow;
+ this.halfOpenAfter = halfOpenAfter;
+ this.failureThreshold = threshold;
+ this.keepOpen.set(keepOpen);
}
-
+
@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
@@ -99,22 +110,37 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
LOG.debug("Initializing ThrottlingExceptionRoutePolicy route policy...");
logState();
}
-
+
+ @Override
+ public void onStart(Route route) {
+ // if keepOpen then start w/ the circuit open
+ if (keepOpen.get()) {
+ openCircuit(route);
+ }
+ }
+
@Override
public void onExchangeDone(Route route, Exchange exchange) {
- if (hasFailed(exchange)) {
- // record the failure
- failures.incrementAndGet();
- lastFailure = System.currentTimeMillis();
- }
-
- // check for state change
- calculateState(route);
+ if (keepOpen.get()) {
+ if (state.get() != STATE_OPEN) {
+ LOG.debug("opening circuit b/c keepOpen is on");
+ openCircuit(route);
+ }
+ } else {
+ if (hasFailed(exchange)) {
+ // record the failure
+ failures.incrementAndGet();
+ lastFailure = System.currentTimeMillis();
+ }
+
+ // check for state change
+ calculateState(route);
+ }
}
-
+
/**
* uses similar approach as {@link CircuitBreakerLoadBalancer}
- * if the exchange has an exception that we are watching
+ * if the exchange has an exception that we are watching
* then we count that as a failure otherwise we ignore it
*/
private boolean hasFailed(Exchange exchange) {
@@ -126,7 +152,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
if (exchange.getException() != null) {
if (throttledExceptions == null || throttledExceptions.isEmpty()) {
- // if no exceptions defined then always fail
+ // if no exceptions defined then always fail
// (ie) assume we throttle on all exceptions
answer = true;
} else {
@@ -148,10 +174,10 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
private void calculateState(Route route) {
-
+
// have we reached the failure limit?
boolean failureLimitReached = isThresholdExceeded();
-
+
if (state.get() == STATE_CLOSED) {
if (failureLimitReached) {
LOG.debug("Opening circuit...");
@@ -166,46 +192,52 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
closeCircuit(route);
}
} else if (state.get() == STATE_OPEN) {
- long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
- if (halfOpenAfter <= elapsedTimeSinceOpened) {
- LOG.debug("Checking an open circuit...");
- if (halfOpenHandler != null) {
- if (halfOpenHandler.isReadyToBeClosed()) {
- LOG.debug("Closing circuit...");
- closeCircuit(route);
+ if (!keepOpen.get()) {
+ long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+ if (halfOpenAfter <= elapsedTimeSinceOpened) {
+ LOG.debug("Checking an open circuit...");
+ if (halfOpenHandler != null) {
+ if (halfOpenHandler.isReadyToBeClosed()) {
+ LOG.debug("Closing circuit...");
+ closeCircuit(route);
+ } else {
+ LOG.debug("Opening circuit...");
+ openCircuit(route);
+ }
} else {
- LOG.debug("Opening circuit...");
- openCircuit(route);
+ LOG.debug("Half opening circuit...");
+ halfOpenCircuit(route);
}
} else {
- LOG.debug("Half opening circuit...");
- halfOpenCircuit(route);
+ log.debug("keeping circuit open (time not elapsed)...");
}
- }
+ } else {
+ log.debug("keeping circuit open (keepOpen is true)...");
+ this.addHalfOpenTimer(route);
+ }
}
-
+
}
-
+
protected boolean isThresholdExceeded() {
boolean output = false;
logState();
- // failures exceed the threshold
+ // failures exceed the threshold
// AND the last of those failures occurred within window
if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
output = true;
}
-
+
return output;
}
-
+
protected void openCircuit(Route route) {
try {
lock.lock();
suspendOrStopConsumer(route.getConsumer());
state.set(STATE_OPEN);
openedAt = System.currentTimeMillis();
- halfOpenTimer = new Timer();
- halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+ this.addHalfOpenTimer(route);
logState();
} catch (Exception e) {
handleException(e);
@@ -214,6 +246,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
}
+ protected void addHalfOpenTimer(Route route) {
+ halfOpenTimer = new Timer();
+ halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+ }
+
protected void halfOpenCircuit(Route route) {
try {
lock.lock();
@@ -226,7 +263,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
lock.unlock();
}
}
-
+
protected void closeCircuit(Route route) {
try {
lock.lock();
@@ -242,13 +279,13 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
lock.unlock();
}
}
-
+
private void logState() {
if (LOG.isDebugEnabled()) {
LOG.debug(dumpState());
}
}
-
+
public String dumpState() {
int num = state.get();
String routeState = stateAsString(num);
@@ -258,7 +295,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
return String.format("State %s, failures %d", routeState, failures.get());
}
}
-
+
private static String stateAsString(int num) {
if (num == STATE_CLOSED) {
return "closed";
@@ -268,21 +305,21 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
return "opened";
}
}
-
+
class HalfOpenTask extends TimerTask {
private final Route route;
-
+
HalfOpenTask(Route route) {
this.route = route;
}
-
+
@Override
public void run() {
- calculateState(route);
halfOpenTimer.cancel();
+ calculateState(route);
}
}
-
+
public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
return halfOpenHandler;
}
@@ -291,6 +328,15 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
this.halfOpenHandler = halfOpenHandler;
}
+ public boolean getKeepOpen() {
+ return this.keepOpen.get();
+ }
+
+ public void setKeepOpen(boolean keepOpen) {
+ log.debug("keep open:" + keepOpen);
+ this.keepOpen.set(keepOpen);
+ }
+
public int getFailureThreshold() {
return failureThreshold;
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
new file mode 100644
index 0000000..25134d5
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
@@ -0,0 +1,98 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThrottlingExceptionRoutePolicyKeepOpenOnInitTest extends ContextTestSupport {
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+ private int size = 5;
+
+ private ThrottlingExceptionRoutePolicy policy;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ this.createPolicy();
+
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ protected void createPolicy() {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 1000;
+ boolean keepOpen = true;
+ policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
+ }
+
+ @Test
+ public void testThrottlingRoutePolicyStartWithAlwaysOpenOn() throws Exception {
+
+ log.debug("---- sending some messages");
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "Message " + i);
+ Thread.sleep(3);
+ }
+
+ // gives time for policy half open check to run every second
+ // and should not close b/c keepOpen is true
+ Thread.sleep(2000);
+
+ // gives time for policy half open check to run every second
+ // but it should never close b/c keepOpen is true
+ result.expectedMessageCount(0);
+ result.setResultWaitTime(1000);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testThrottlingRoutePolicyStartWithAlwaysOpenOnThenClose() throws Exception {
+
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "Message " + i);
+ Thread.sleep(3);
+ }
+
+ // gives time for policy half open check to run every second
+ // and should not close b/c keepOpen is true
+ Thread.sleep(2000);
+
+ result.expectedMessageCount(0);
+ result.setResultWaitTime(1500);
+ assertMockEndpointsSatisfied();
+
+ // set keepOpen to false
+ // now half open check will succeed
+ policy.setKeepOpen(false);
+
+ // gives time for policy half open check to run every second
+ // and should close and get all the messages
+ result.expectedMessageCount(5);
+ result.setResultWaitTime(1500);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
new file mode 100644
index 0000000..650d08c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
@@ -0,0 +1,104 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThrottlingExceptionRoutePolicyOpenViaConfigTest extends ContextTestSupport {
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+ private int size = 5;
+
+ private ThrottlingExceptionRoutePolicy policy;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ this.createPolicy();
+
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ protected void createPolicy() {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 1000;
+ boolean keepOpen = false;
+ policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
+ }
+
+ @Test
+ public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws Exception {
+
+ // send first set of messages
+ // should go through b/c circuit is closed
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "MessageRound1 " + i);
+ Thread.sleep(3);
+ }
+ result.expectedMessageCount(size);
+ result.setResultWaitTime(2000);
+ assertMockEndpointsSatisfied();
+
+ // set keepOpen to true
+ policy.setKeepOpen(true);
+
+ // trigger opening circuit
+ // by sending another message
+ template.sendBody(url, "MessageTrigger");
+
+ // give time for circuit to open
+ Thread.sleep(1000);
+
+ // send next set of messages
+ // should NOT go through b/c circuit is open
+ for (int i = 0; i < size; i++) {
+ template.sendBody(url, "MessageRound2 " + i);
+ Thread.sleep(3);
+ }
+
+ // gives time for policy half open check to run every second
+ // and should not close b/c keepOpen is true
+ Thread.sleep(2000);
+
+ result.expectedMessageCount(size + 1);
+ result.setResultWaitTime(2000);
+ assertMockEndpointsSatisfied();
+
+ // set keepOpen to false
+ policy.setKeepOpen(false);
+
+ // gives time for policy half open check to run every second
+ // and it should close b/c keepOpen is false
+ result.expectedMessageCount(size * 2 + 1);
+ result.setResultWaitTime(2000);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 1000;
+ policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.