You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/10 08:51:36 UTC
[2/2] git commit: CAMEL-7491 Added an option in throttler to throw
RejectExecutionException instead of delaying the exchange
CAMEL-7491 Added an option in throttler to throw RejectExecutionException instead of delaying the exchange
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/86797a34
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/86797a34
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/86797a34
Branch: refs/heads/master
Commit: 86797a341f82be858c93ac2778f0d31195e12c25
Parents: 20e6af8
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Jun 10 14:46:34 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Jun 10 14:46:34 2014 +0800
----------------------------------------------------------------------
.../apache/camel/model/ThrottleDefinition.java | 29 ++++++++++-
.../camel/processor/DelayProcessorSupport.java | 52 +++++++++++---------
.../org/apache/camel/processor/Throttler.java | 25 +++++++++-
.../apache/camel/processor/ThrottlerTest.java | 34 +++++++++++--
4 files changed, 109 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 191397a..b829052 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -51,6 +51,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
private Boolean asyncDelayed;
@XmlAttribute
private Boolean callerRunsWhenRejected;
+ @XmlAttribute
+ private Boolean rejectExecution;
public ThrottleDefinition() {
}
@@ -84,7 +86,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
-
+
// should be default 1000 millis
long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
@@ -94,7 +96,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
}
- Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
+ Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, isRejectExecution());
if (getAsyncDelayed() != null) {
answer.setAsyncDelayed(getAsyncDelayed());
@@ -164,6 +166,19 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
setAsyncDelayed(true);
return this;
}
+
+ /**
+ * Whether or not throttler throws the RejectExceutionException when the exchange exceeds the request limit
+ * <p/>
+ * Is by default <tt>false</tt>
+ *
+ * @param throw the RejectExecutionException if the exchange exceeds the request limit
+ * @return the builder
+ */
+ public ThrottleDefinition rejectExecution(boolean rejectExecution) {
+ setRejectExecution(rejectExecution);
+ return this;
+ }
public ThrottleDefinition executorService(ExecutorService executorService) {
setExecutorService(executorService);
@@ -174,6 +189,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
setExecutorServiceRef(executorServiceRef);
return this;
}
+
+
// Properties
// -------------------------------------------------------------------------
@@ -221,4 +238,12 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
public void setExecutorServiceRef(String executorServiceRef) {
this.executorServiceRef = executorServiceRef;
}
+
+ public boolean isRejectExecution() {
+ return rejectExecution != null ? rejectExecution : false;
+ }
+
+ public void setRejectExecution(Boolean rejectExecution) {
+ this.rejectExecution = rejectExecution;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ff81170..05ba626 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -90,30 +90,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
this.executorService = executorService;
this.shutdownExecutorService = shutdownExecutorService;
}
-
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- if (!isRunAllowed()) {
- exchange.setException(new RejectedExecutionException("Run is not allowed"));
- callback.done(true);
- return true;
- }
-
- // calculate delay and wait
- long delay;
- try {
- delay = calculateDelay(exchange);
- if (delay <= 0) {
- // no delay then continue routing
- log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
- return processor.process(exchange, callback);
- }
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
-
+
+ protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) {
if (!isAsyncDelayed() || exchange.isTransacted()) {
// use synchronous delay (also required if using transactions)
try {
@@ -164,6 +142,32 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
}
}
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (!isRunAllowed()) {
+ exchange.setException(new RejectedExecutionException("Run is not allowed"));
+ callback.done(true);
+ return true;
+ }
+
+ // calculate delay and wait
+ long delay;
+ try {
+ delay = calculateDelay(exchange);
+ if (delay <= 0) {
+ // no delay then continue routing
+ log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+ return processor.process(exchange, callback);
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
+ return processDelay(exchange, callback, delay);
+ }
+
public boolean isAsyncDelayed() {
return asyncDelayed;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 6b51a2c..a48f6a5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,9 +16,11 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -43,10 +45,12 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
private Expression maxRequestsPerPeriodExpression;
private AtomicLong timePeriodMillis = new AtomicLong(1000);
private volatile TimeSlot slot;
+ private boolean rejectExecution;
public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
- ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+ ScheduledExecutorService executorService, boolean shutdownExecutorService, boolean rejectExecution) {
super(camelContext, processor, executorService, shutdownExecutorService);
+ this.rejectExecution = rejectExecution;
ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
@@ -196,4 +200,23 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
TimeSlot getSlot() {
return this.slot;
}
+
+ public boolean isRejectExecution() {
+ return rejectExecution;
+ }
+
+ public void setRejectExecution(boolean rejectExecution) {
+ this.rejectExecution = rejectExecution;
+ }
+
+ @Override
+ protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) {
+ if (isRejectExecution() && delay > 0) {
+ exchange.setException(new RejectedExecutionException("Exceed the max request limit!"));
+ callback.done(true);
+ return true;
+ } else {
+ return super.processDelay(exchange, callback, delay);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 29fdc45..c1ffbc1 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -18,13 +18,13 @@ package org.apache.camel.processor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.processor.Throttler.TimeSlot;
-
import static org.apache.camel.builder.Builder.constant;
/**
@@ -37,7 +37,7 @@ public class ThrottlerTest extends ContextTestSupport {
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(5000);
+ resultEndpoint.setResultWaitTime(2000);
for (int i = 0; i < messageCount; i++) {
template.sendBody("seda:a", "<message>" + i + "</message>");
@@ -47,6 +47,25 @@ public class ThrottlerTest extends ContextTestSupport {
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
+
+ public void testSendLotsOfMessagesWithRejctExecution() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(2);
+ resultEndpoint.setResultWaitTime(2000);
+
+ MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error", MockEndpoint.class);
+ errorEndpoint.expectedMessageCount(4);
+ errorEndpoint.setResultWaitTime(2000);
+
+ for (int i = 0; i < 6; i++) {
+ template.sendBody("direct:start", "<message>" + i + "</message>");
+ }
+
+ // lets pause to give the requests time to be processed
+ // to check that the throttle really does kick in
+ resultEndpoint.assertIsSatisfied();
+ errorEndpoint.assertIsSatisfied();
+ }
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -75,7 +94,7 @@ public class ThrottlerTest extends ContextTestSupport {
}
public void testTimeSlotCalculus() throws Exception {
- Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+ Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false);
// calculate will assign a new slot
throttler.calculateDelay(new DefaultExchange(context));
TimeSlot slot = throttler.nextSlot();
@@ -93,7 +112,7 @@ public class ThrottlerTest extends ContextTestSupport {
}
public void testTimeSlotCalculusForPeriod() throws InterruptedException {
- Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+ Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false);
throttler.calculateDelay(new DefaultExchange(context));
TimeSlot slot = throttler.getSlot();
@@ -202,6 +221,11 @@ public class ThrottlerTest extends ContextTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
+
+ onException(RejectedExecutionException.class)
+ .handled(true)
+ .to("mock:error");
+
// START SNIPPET: ex
from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
// END SNIPPET: ex
@@ -211,6 +235,8 @@ public class ThrottlerTest extends ContextTestSupport {
from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
+
+ from("direct:start").throttle(2).timePeriodMillis(10000).rejectExecution(true).to("log:result", "mock:result");
}
};
}