You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/10 08:51:36 UTC

[2/2] git commit: CAMEL-7491 Added an option in throttler to throw RejectExecutionException instead of delaying the exchange

CAMEL-7491 Added an option in throttler to throw RejectExecutionException instead of delaying the exchange


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/86797a34
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/86797a34
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/86797a34

Branch: refs/heads/master
Commit: 86797a341f82be858c93ac2778f0d31195e12c25
Parents: 20e6af8
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Jun 10 14:46:34 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Jun 10 14:46:34 2014 +0800

----------------------------------------------------------------------
 .../apache/camel/model/ThrottleDefinition.java  | 29 ++++++++++-
 .../camel/processor/DelayProcessorSupport.java  | 52 +++++++++++---------
 .../org/apache/camel/processor/Throttler.java   | 25 +++++++++-
 .../apache/camel/processor/ThrottlerTest.java   | 34 +++++++++++--
 4 files changed, 109 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 191397a..b829052 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -51,6 +51,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean asyncDelayed;
     @XmlAttribute
     private Boolean callerRunsWhenRejected;
+    @XmlAttribute
+    private Boolean rejectExecution;
     
     public ThrottleDefinition() {
     }
@@ -84,7 +86,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
 
         boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
         ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
-
+        
         // should be default 1000 millis
         long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
 
@@ -94,7 +96,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
             throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
         }
 
-        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, isRejectExecution());
 
         if (getAsyncDelayed() != null) {
             answer.setAsyncDelayed(getAsyncDelayed());
@@ -164,6 +166,19 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         setAsyncDelayed(true);
         return this;
     }
+    
+    /**
+     * Whether or not throttler throws the RejectExceutionException when the exchange exceeds the request limit
+     * <p/>
+     * Is by default <tt>false</tt>
+     *
+     * @param throw the RejectExecutionException if the exchange exceeds the request limit 
+     * @return the builder
+     */
+    public ThrottleDefinition rejectExecution(boolean rejectExecution) {
+        setRejectExecution(rejectExecution);
+        return this;
+    }
 
     public ThrottleDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
@@ -174,6 +189,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         setExecutorServiceRef(executorServiceRef);
         return this;
     }
+    
+    
 
     // Properties
     // -------------------------------------------------------------------------
@@ -221,4 +238,12 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     public void setExecutorServiceRef(String executorServiceRef) {
         this.executorServiceRef = executorServiceRef;
     }
+    
+    public boolean isRejectExecution() {
+        return rejectExecution != null ? rejectExecution : false;
+    }
+
+    public void setRejectExecution(Boolean rejectExecution) {
+        this.rejectExecution = rejectExecution;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ff81170..05ba626 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -90,30 +90,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
         this.executorService = executorService;
         this.shutdownExecutorService = shutdownExecutorService;
     }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        if (!isRunAllowed()) {
-            exchange.setException(new RejectedExecutionException("Run is not allowed"));
-            callback.done(true);
-            return true;
-        }
-
-        // calculate delay and wait
-        long delay;
-        try {
-            delay = calculateDelay(exchange);
-            if (delay <= 0) {
-                // no delay then continue routing
-                log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
-                return processor.process(exchange, callback);
-            }
-        } catch (Throwable e) {
-            exchange.setException(e);
-            callback.done(true);
-            return true;
-        }
-
+    
+    protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) {
         if (!isAsyncDelayed() || exchange.isTransacted()) {
             // use synchronous delay (also required if using transactions)
             try {
@@ -164,6 +142,32 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
         }
     }
 
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            exchange.setException(new RejectedExecutionException("Run is not allowed"));
+            callback.done(true);
+            return true;
+        }
+
+        // calculate delay and wait
+        long delay;
+        try {
+            delay = calculateDelay(exchange);
+            if (delay <= 0) {
+                // no delay then continue routing
+                log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+                return processor.process(exchange, callback);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+        
+        return processDelay(exchange, callback, delay);
+    }
+
     public boolean isAsyncDelayed() {
         return asyncDelayed;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 6b51a2c..a48f6a5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -43,10 +45,12 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     private Expression maxRequestsPerPeriodExpression;
     private AtomicLong timePeriodMillis = new AtomicLong(1000);
     private volatile TimeSlot slot;
+    private boolean rejectExecution;
 
     public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
-                     ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+                     ScheduledExecutorService executorService, boolean shutdownExecutorService, boolean rejectExecution) {
         super(camelContext, processor, executorService, shutdownExecutorService);
+        this.rejectExecution = rejectExecution;
 
         ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
         this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
@@ -196,4 +200,23 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     TimeSlot getSlot() {
         return this.slot;
     }
+
+    public boolean isRejectExecution() {
+        return rejectExecution;
+    }
+
+    public void setRejectExecution(boolean rejectExecution) {
+        this.rejectExecution = rejectExecution;
+    }
+    
+    @Override
+    protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) {
+        if (isRejectExecution() && delay > 0) {
+            exchange.setException(new RejectedExecutionException("Exceed the max request limit!"));
+            callback.done(true);
+            return true;
+        } else {
+            return super.processDelay(exchange, callback, delay);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 29fdc45..c1ffbc1 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -18,13 +18,13 @@ package org.apache.camel.processor;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.processor.Throttler.TimeSlot;
-
 import static org.apache.camel.builder.Builder.constant;
 
 /**
@@ -37,7 +37,7 @@ public class ThrottlerTest extends ContextTestSupport {
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setResultWaitTime(5000);
+        resultEndpoint.setResultWaitTime(2000);
 
         for (int i = 0; i < messageCount; i++) {
             template.sendBody("seda:a", "<message>" + i + "</message>");
@@ -47,6 +47,25 @@ public class ThrottlerTest extends ContextTestSupport {
         // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
+    
+    public void testSendLotsOfMessagesWithRejctExecution() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(2);
+        resultEndpoint.setResultWaitTime(2000);
+        
+        MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error", MockEndpoint.class);
+        errorEndpoint.expectedMessageCount(4);
+        errorEndpoint.setResultWaitTime(2000);
+
+        for (int i = 0; i < 6; i++) {
+            template.sendBody("direct:start", "<message>" + i + "</message>");
+        }
+
+        // lets pause to give the requests time to be processed
+        // to check that the throttle really does kick in
+        resultEndpoint.assertIsSatisfied();
+        errorEndpoint.assertIsSatisfied();
+    }
 
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -75,7 +94,7 @@ public class ThrottlerTest extends ContextTestSupport {
     }
 
     public void testTimeSlotCalculus() throws Exception {
-        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false);
         // calculate will assign a new slot
         throttler.calculateDelay(new DefaultExchange(context));
         TimeSlot slot = throttler.nextSlot();
@@ -93,7 +112,7 @@ public class ThrottlerTest extends ContextTestSupport {
     }
 
     public void testTimeSlotCalculusForPeriod() throws InterruptedException {
-        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false);
         throttler.calculateDelay(new DefaultExchange(context));
 
         TimeSlot slot = throttler.getSlot();
@@ -202,6 +221,11 @@ public class ThrottlerTest extends ContextTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
+                
+                onException(RejectedExecutionException.class)
+                    .handled(true)
+                    .to("mock:error");
+                
                 // START SNIPPET: ex
                 from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
                 // END SNIPPET: ex
@@ -211,6 +235,8 @@ public class ThrottlerTest extends ContextTestSupport {
                 from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
 
                 from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
+                
+                from("direct:start").throttle(2).timePeriodMillis(10000).rejectExecution(true).to("log:result", "mock:result");
             }
         };
     }