You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 14:55:06 UTC
[3/3] git commit: CAMEL-7160: Fixed throttler with dynamic header
changing rate not as expected. Thanks to Michael Pisula for the patch.
CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04f0d8d7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04f0d8d7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04f0d8d7
Branch: refs/heads/camel-2.11.x
Commit: 04f0d8d7f5c8a1494eba1b311d4fbf45b125d16e
Parents: 9fe42d1
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 14:55:23 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 14:55:55 2014 +0100
----------------------------------------------------------------------
.../org/apache/camel/processor/Throttler.java | 28 +++++++-----
.../apache/camel/processor/ThrottlerTest.java | 46 ++++++++++++++++++--
2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..3ecc24f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -34,8 +34,8 @@ import org.apache.camel.util.ObjectHelper;
* as only allowing 100 requests per second; or if huge load can cause a
* particular system to malfunction or to reduce its throughput you might want
* to introduce some throttling.
- *
- * @version
+ *
+ * @version
*/
public class Throttler extends DelayProcessorSupport implements Traceable {
private volatile long maximumRequestsPerPeriod;
@@ -79,7 +79,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
public Expression getMaximumRequestsPerPeriodExpression() {
return maxRequestsPerPeriodExpression;
}
-
+
public long getTimePeriodMillis() {
return timePeriodMillis;
}
@@ -115,6 +115,9 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue);
}
+ if (maximumRequestsPerPeriod > longValue) {
+ slot.capacity = 0;
+ }
maximumRequestsPerPeriod = longValue;
}
@@ -130,7 +133,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
return 0;
}
}
-
+
/*
* Determine what the next available time slot is for handling an Exchange
*/
@@ -138,7 +141,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (slot == null) {
slot = new TimeSlot();
}
- if (slot.isFull() || !slot.isActive()) {
+ if (slot.isFull() || !slot.isPast()) {
slot = slot.next();
}
slot.assign();
@@ -149,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
* A time slot is capable of handling a number of exchanges within a certain period of time.
*/
protected class TimeSlot {
-
+
private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
private final long duration = Throttler.this.timePeriodMillis;
private final long startTime;
@@ -165,7 +168,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
protected void assign() {
capacity--;
}
-
+
/*
* Start the next time slot either now or in the future
* (no time slots are being created in the past)
@@ -173,15 +176,20 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
protected TimeSlot next() {
return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
}
-
+
+ protected boolean isPast() {
+ long current = System.currentTimeMillis();
+ return current < (startTime + duration);
+ }
+
protected boolean isActive() {
long current = System.currentTimeMillis();
return startTime <= current && current < (startTime + duration);
}
-
+
protected boolean isFull() {
return capacity <= 0;
- }
+ }
}
TimeSlot getSlot() {
http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 01e3469..29fdc45 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -28,7 +28,7 @@ import org.apache.camel.processor.Throttler.TimeSlot;
import static org.apache.camel.builder.Builder.constant;
/**
- * @version
+ * @version
*/
public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
@@ -47,7 +47,7 @@ public class ThrottlerTest extends ContextTestSupport {
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
-
+
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(messageCount);
@@ -85,7 +85,7 @@ public class ThrottlerTest extends ContextTestSupport {
assertSame(slot, throttler.nextSlot());
assertTrue(slot.isFull());
assertTrue(slot.isActive());
-
+
TimeSlot next = throttler.nextSlot();
// now we should have a new slot that starts somewhere in the future
assertNotSame(slot, next);
@@ -159,6 +159,46 @@ public class ThrottlerTest extends ContextTestSupport {
executor.shutdownNow();
}
+ public void testConfigurationWithChangingHeaderExpression() throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+ resultEndpoint.reset();
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 10);
+
+ resultEndpoint.reset();
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+ executor.shutdownNow();
+ }
+
+ private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int
+ throttle) throws InterruptedException {
+ resultEndpoint.expectedMessageCount(messageCount);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue",
+ throttle);
+ }
+ });
+ }
+
+ // let's wait for the exchanges to arrive
+ resultEndpoint.assertIsSatisfied();
+
+ // now assert that they have actually been throttled
+ long minimumTime = (messageCount - 1) * INTERVAL / throttle;
+ // add a little slack
+ long delta = System.currentTimeMillis() - start + 200;
+ assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
+ long maxTime = (messageCount - 1) * INTERVAL / throttle * 3;
+ assertTrue("Should take at most " + maxTime + "ms, was: " + delta, delta <= maxTime);
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {