You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 14:55:05 UTC
[2/3] git commit: CAMEL-7160: Fixed throttler with dynamic header
changing rate not as expected. Thanks to Michael Pisula for the patch.
CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b972cf85
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b972cf85
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b972cf85
Branch: refs/heads/camel-2.12.x
Commit: b972cf85c08f56ec4d97584a40fe42d4deab69fd
Parents: ddabc95
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 14:55:23 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 14:55:41 2014 +0100
----------------------------------------------------------------------
.../org/apache/camel/processor/Throttler.java | 28 +++++++-----
.../apache/camel/processor/ThrottlerTest.java | 46 ++++++++++++++++++--
2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b972cf85/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index ae6bc26..6b51a2c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -35,8 +35,8 @@ import org.apache.camel.util.ObjectHelper;
* as only allowing 100 requests per second; or if huge load can cause a
* particular system to malfunction or to reduce its throughput you might want
* to introduce some throttling.
- *
- * @version
+ *
+ * @version
*/
public class Throttler extends DelayProcessorSupport implements Traceable {
private volatile long maximumRequestsPerPeriod;
@@ -80,7 +80,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
public Expression getMaximumRequestsPerPeriodExpression() {
return maxRequestsPerPeriodExpression;
}
-
+
public long getTimePeriodMillis() {
return timePeriodMillis.get();
}
@@ -116,6 +116,9 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue);
}
+ if (maximumRequestsPerPeriod > longValue) {
+ slot.capacity = 0;
+ }
maximumRequestsPerPeriod = longValue;
}
@@ -131,7 +134,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
return 0;
}
}
-
+
/*
* Determine what the next available time slot is for handling an Exchange
*/
@@ -139,7 +142,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (slot == null) {
slot = new TimeSlot();
}
- if (slot.isFull() || !slot.isActive()) {
+ if (slot.isFull() || !slot.isPast()) {
slot = slot.next();
}
slot.assign();
@@ -150,7 +153,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
* A time slot is capable of handling a number of exchanges within a certain period of time.
*/
protected class TimeSlot {
-
+
private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
private final long duration = Throttler.this.timePeriodMillis.get();
private final long startTime;
@@ -166,7 +169,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
protected void assign() {
capacity--;
}
-
+
/*
* Start the next time slot either now or in the future
* (no time slots are being created in the past)
@@ -174,15 +177,20 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
protected TimeSlot next() {
return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
}
-
+
+ protected boolean isPast() {
+ long current = System.currentTimeMillis();
+ return current < (startTime + duration);
+ }
+
protected boolean isActive() {
long current = System.currentTimeMillis();
return startTime <= current && current < (startTime + duration);
}
-
+
protected boolean isFull() {
return capacity <= 0;
- }
+ }
}
TimeSlot getSlot() {
http://git-wip-us.apache.org/repos/asf/camel/blob/b972cf85/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 01e3469..29fdc45 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -28,7 +28,7 @@ import org.apache.camel.processor.Throttler.TimeSlot;
import static org.apache.camel.builder.Builder.constant;
/**
- * @version
+ * @version
*/
public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
@@ -47,7 +47,7 @@ public class ThrottlerTest extends ContextTestSupport {
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
-
+
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(messageCount);
@@ -85,7 +85,7 @@ public class ThrottlerTest extends ContextTestSupport {
assertSame(slot, throttler.nextSlot());
assertTrue(slot.isFull());
assertTrue(slot.isActive());
-
+
TimeSlot next = throttler.nextSlot();
// now we should have a new slot that starts somewhere in the future
assertNotSame(slot, next);
@@ -159,6 +159,46 @@ public class ThrottlerTest extends ContextTestSupport {
executor.shutdownNow();
}
+ public void testConfigurationWithChangingHeaderExpression() throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+ resultEndpoint.reset();
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 10);
+
+ resultEndpoint.reset();
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+ executor.shutdownNow();
+ }
+
+ private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int
+ throttle) throws InterruptedException {
+ resultEndpoint.expectedMessageCount(messageCount);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue",
+ throttle);
+ }
+ });
+ }
+
+ // let's wait for the exchanges to arrive
+ resultEndpoint.assertIsSatisfied();
+
+ // now assert that they have actually been throttled
+ long minimumTime = (messageCount - 1) * INTERVAL / throttle;
+ // add a little slack
+ long delta = System.currentTimeMillis() - start + 200;
+ assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
+ long maxTime = (messageCount - 1) * INTERVAL / throttle * 3;
+ assertTrue("Should take at most " + maxTime + "ms, was: " + delta, delta <= maxTime);
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {