You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/10 12:07:31 UTC

[camel] 01/02: [CAMEL-12125] add keepOpen to endpoint circuit breaker

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9cc0eedfd62be8809db4c9c97ad42eddda121d0f
Author: CodeSmell <mb...@gmail.com>
AuthorDate: Thu Jan 4 17:57:13 2018 -0500

    [CAMEL-12125] add keepOpen to endpoint circuit breaker
---
 .../camel/impl/ThrottlingExceptionRoutePolicy.java | 168 +++++++++++++--------
 ...lingExceptionRoutePolicyKeepOpenOnInitTest.java |  98 ++++++++++++
 ...tlingExceptionRoutePolicyOpenViaConfigTest.java | 104 +++++++++++++
 3 files changed, 309 insertions(+), 61 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 92aba77..0ee7b83 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -16,13 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
@@ -33,38 +26,46 @@ import org.apache.camel.support.RoutePolicySupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
  * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
- * thrown and the threshold setting. 
- * 
+ * thrown and the threshold setting.
+ *
  * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
- * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. 
- * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move 
+ * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
+ * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
  * to a half open state and attempt to determine if the consumer can be started.
  * There are two ways to determine if a route can be closed after being opened
  * (1) start the consumer and check the failure threshold
- * (2) call the {@link ThrottlingExceptionHalfOpenHandler} 
+ * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
  * The second option allows a custom check to be performed without having to take on the possibility of
  * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
- * to determine if the processes that cause the route to be open are now available  
+ * to determine if the processes that cause the route to be open are now available
  */
 public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
     private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
-    
+
     private static final int STATE_CLOSED = 0;
     private static final int STATE_HALF_OPEN = 1;
     private static final int STATE_OPEN = 2;
-    
+
     private CamelContext camelContext;
     private final Lock lock = new ReentrantLock();
-    
+
     // configuration
     private int failureThreshold;
     private long failureWindow;
     private long halfOpenAfter;
     private final List<Class<?>> throttledExceptions;
-    
+
     // handler for half open circuit
     // can be used instead of resuming route
     // to check on resources
@@ -73,17 +74,27 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     // stateful information
     private final AtomicInteger failures = new AtomicInteger();
     private final AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+    private AtomicBoolean keepOpen = new AtomicBoolean(false);
     private volatile Timer halfOpenTimer;
     private volatile long lastFailure;
     private volatile long openedAt;
-    
+
     public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
         this.throttledExceptions = handledExceptions;
         this.failureWindow = failureWindow;
         this.halfOpenAfter = halfOpenAfter;
         this.failureThreshold = threshold;
+        this.keepOpen.set(false);
+    }
+
+    public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) {
+        this.throttledExceptions = handledExceptions;
+        this.failureWindow = failureWindow;
+        this.halfOpenAfter = halfOpenAfter;
+        this.failureThreshold = threshold;
+        this.keepOpen.set(keepOpen);
     }
-    
+
     @Override
     public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -99,22 +110,37 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         LOG.debug("Initializing ThrottlingExceptionRoutePolicy route policy...");
         logState();
     }
-    
+
+    @Override
+    public void onStart(Route route) {
+        // if keepOpen then start w/ the circuit open
+        if (keepOpen.get()) {
+            openCircuit(route);
+        }
+    }
+
     @Override
     public void onExchangeDone(Route route, Exchange exchange) {
-        if (hasFailed(exchange)) {
-            // record the failure
-            failures.incrementAndGet();
-            lastFailure = System.currentTimeMillis();
-        } 
-        
-        // check for state change
-        calculateState(route);
+        if (keepOpen.get()) {
+            if (state.get() != STATE_OPEN) {
+                LOG.debug("opening circuit b/c keepOpen is on");
+                openCircuit(route);
+            }
+        } else {
+            if (hasFailed(exchange)) {
+                // record the failure
+                failures.incrementAndGet();
+                lastFailure = System.currentTimeMillis();
+            }
+
+            // check for state change
+            calculateState(route);
+        }
     }
-    
+
     /**
      * uses similar approach as {@link CircuitBreakerLoadBalancer}
-     * if the exchange has an exception that we are watching 
+     * if the exchange has an exception that we are watching
      * then we count that as a failure otherwise we ignore it
      */
     private boolean hasFailed(Exchange exchange) {
@@ -126,7 +152,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
 
         if (exchange.getException() != null) {
             if (throttledExceptions == null || throttledExceptions.isEmpty()) {
-                // if no exceptions defined then always fail 
+                // if no exceptions defined then always fail
                 // (ie) assume we throttle on all exceptions
                 answer = true;
             } else {
@@ -148,10 +174,10 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
     }
 
     private void calculateState(Route route) {
-        
+
         // have we reached the failure limit?
         boolean failureLimitReached = isThresholdExceeded();
-        
+
         if (state.get() == STATE_CLOSED) {
             if (failureLimitReached) {
                 LOG.debug("Opening circuit...");
@@ -166,46 +192,52 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
                 closeCircuit(route);
             }
         } else if (state.get() == STATE_OPEN) {
-            long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
-            if (halfOpenAfter <= elapsedTimeSinceOpened) {
-                LOG.debug("Checking an open circuit...");
-                if (halfOpenHandler != null) {
-                    if (halfOpenHandler.isReadyToBeClosed()) {
-                        LOG.debug("Closing circuit...");
-                        closeCircuit(route);
+            if (!keepOpen.get()) {
+                long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
+                if (halfOpenAfter <= elapsedTimeSinceOpened) {
+                    LOG.debug("Checking an open circuit...");
+                    if (halfOpenHandler != null) {
+                        if (halfOpenHandler.isReadyToBeClosed()) {
+                            LOG.debug("Closing circuit...");
+                            closeCircuit(route);
+                        } else {
+                            LOG.debug("Opening circuit...");
+                            openCircuit(route);
+                        }
                     } else {
-                        LOG.debug("Opening circuit...");
-                        openCircuit(route);
+                        LOG.debug("Half opening circuit...");
+                        halfOpenCircuit(route);
                     }
                 } else {
-                    LOG.debug("Half opening circuit...");
-                    halfOpenCircuit(route);                    
+                    log.debug("keeping circuit open (time not elapsed)...");
                 }
-            } 
+            } else {
+                log.debug("keeping circuit open (keepOpen is true)...");
+                this.addHalfOpenTimer(route);
+            }
         }
-        
+
     }
-    
+
     protected boolean isThresholdExceeded() {
         boolean output = false;
         logState();
-        // failures exceed the threshold 
+        // failures exceed the threshold
         // AND the last of those failures occurred within window
         if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
             output = true;
         }
-        
+
         return output;
     }
-        
+
     protected void openCircuit(Route route) {
         try {
             lock.lock();
             suspendOrStopConsumer(route.getConsumer());
             state.set(STATE_OPEN);
             openedAt = System.currentTimeMillis();
-            halfOpenTimer = new Timer();
-            halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+            this.addHalfOpenTimer(route);
             logState();
         } catch (Exception e) {
             handleException(e);
@@ -214,6 +246,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         }
     }
 
+    protected void addHalfOpenTimer(Route route) {
+        halfOpenTimer = new Timer();
+        halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
+    }
+
     protected void halfOpenCircuit(Route route) {
         try {
             lock.lock();
@@ -226,7 +263,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
             lock.unlock();
         }
     }
-    
+
     protected void closeCircuit(Route route) {
         try {
             lock.lock();
@@ -242,13 +279,13 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
             lock.unlock();
         }
     }
-    
+
     private void logState() {
         if (LOG.isDebugEnabled()) {
             LOG.debug(dumpState());
         }
     }
-    
+
     public String dumpState() {
         int num = state.get();
         String routeState = stateAsString(num);
@@ -258,7 +295,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
             return String.format("State %s, failures %d", routeState, failures.get());
         }
     }
-    
+
     private static String stateAsString(int num) {
         if (num == STATE_CLOSED) {
             return "closed";
@@ -268,21 +305,21 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
             return "opened";
         }
     }
-    
+
     class HalfOpenTask extends TimerTask {
         private final Route route;
-        
+
         HalfOpenTask(Route route) {
             this.route = route;
         }
-        
+
         @Override
         public void run() {
-            calculateState(route);
             halfOpenTimer.cancel();
+            calculateState(route);
         }
     }
-    
+
     public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
         return halfOpenHandler;
     }
@@ -291,6 +328,15 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
         this.halfOpenHandler = halfOpenHandler;
     }
 
+    public boolean getKeepOpen() {
+        return this.keepOpen.get();
+    }
+
+    public void setKeepOpen(boolean keepOpen) {
+        log.debug("keep open:" + keepOpen);
+        this.keepOpen.set(keepOpen);
+    }
+
     public int getFailureThreshold() {
         return failureThreshold;
     }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
new file mode 100644
index 0000000..25134d5
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java
@@ -0,0 +1,98 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThrottlingExceptionRoutePolicyKeepOpenOnInitTest extends ContextTestSupport {
+
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    private int size = 5;
+
+    private ThrottlingExceptionRoutePolicy policy;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        this.createPolicy();
+
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        context.getShutdownStrategy().setTimeout(1);
+    }
+
+    protected void createPolicy() {
+        int threshold = 2;
+        long failureWindow = 30;
+        long halfOpenAfter = 1000;
+        boolean keepOpen = true;
+        policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
+    }
+
+    @Test
+    public void testThrottlingRoutePolicyStartWithAlwaysOpenOn() throws Exception {
+
+        log.debug("---- sending some messages");
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "Message " + i);
+            Thread.sleep(3);
+        }
+
+        // gives time for policy half open check to run every second
+        // and should not close b/c keepOpen is true
+        Thread.sleep(2000);
+
+        // gives time for policy half open check to run every second
+        // but it should never close b/c keepOpen is true
+        result.expectedMessageCount(0);
+        result.setResultWaitTime(1000);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testThrottlingRoutePolicyStartWithAlwaysOpenOnThenClose() throws Exception {
+
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "Message " + i);
+            Thread.sleep(3);
+        }
+
+        // gives time for policy half open check to run every second
+        // and should not close b/c keepOpen is true
+        Thread.sleep(2000);
+
+        result.expectedMessageCount(0);
+        result.setResultWaitTime(1500);
+        assertMockEndpointsSatisfied();
+
+        // set keepOpen to false
+        // now half open check will succeed
+        policy.setKeepOpen(false);
+
+        // gives time for policy half open check to run every second
+        // and should close and get all the messages
+        result.expectedMessageCount(5);
+        result.setResultWaitTime(1500);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
new file mode 100644
index 0000000..650d08c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java
@@ -0,0 +1,104 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThrottlingExceptionRoutePolicyOpenViaConfigTest extends ContextTestSupport {
+
+    private String url = "seda:foo?concurrentConsumers=20";
+    private MockEndpoint result;
+    private int size = 5;
+
+    private ThrottlingExceptionRoutePolicy policy;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        this.createPolicy();
+
+        super.setUp();
+        this.setUseRouteBuilder(true);
+        result = getMockEndpoint("mock:result");
+        context.getShutdownStrategy().setTimeout(1);
+    }
+
+    protected void createPolicy() {
+        int threshold = 2;
+        long failureWindow = 30;
+        long halfOpenAfter = 1000;
+        boolean keepOpen = false;
+        policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
+    }
+
+    @Test
+    public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws Exception {
+
+        // send first set of messages
+        // should go through b/c circuit is closed
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "MessageRound1 " + i);
+            Thread.sleep(3);
+        }
+        result.expectedMessageCount(size);
+        result.setResultWaitTime(2000);
+        assertMockEndpointsSatisfied();
+
+        // set keepOpen to true
+        policy.setKeepOpen(true);
+
+        // trigger opening circuit
+        // by sending another message
+        template.sendBody(url, "MessageTrigger");
+
+        // give time for circuit to open
+        Thread.sleep(1000);
+
+        // send next set of messages
+        // should NOT go through b/c circuit is open
+        for (int i = 0; i < size; i++) {
+            template.sendBody(url, "MessageRound2 " + i);
+            Thread.sleep(3);
+        }
+
+        // gives time for policy half open check to run every second
+        // and should not close b/c keepOpen is true
+        Thread.sleep(2000);
+
+        result.expectedMessageCount(size + 1);
+        result.setResultWaitTime(2000);
+        assertMockEndpointsSatisfied();
+
+        // set keepOpen to false
+        policy.setKeepOpen(false);
+
+        // gives time for policy half open check to run every second
+        // and it should close b/c keepOpen is false
+        result.expectedMessageCount(size * 2 + 1);
+        result.setResultWaitTime(2000);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                int threshold = 2;
+                long failureWindow = 30;
+                long halfOpenAfter = 1000;
+                policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+                from(url)
+                    .routePolicy(policy)
+                    .log("${body}")
+                    .to("log:foo?groupSize=10")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.