You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/17 12:00:19 UTC
[3/3] git commit: CAMEL-6678: Fixed Throttler does not honor time
slots after period expires. Thanks to Christian Posta for the patch.
CAMEL-6678: Fixed Throttler does not honor time slots after period expires. Thanks to Christian Posta for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6333fe1f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6333fe1f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6333fe1f
Branch: refs/heads/camel-2.11.x
Commit: 6333fe1f73f961993aabfdc64ffbf15a48f1dbdc
Parents: c8da7e4
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 17 11:58:56 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 17 12:00:02 2013 +0200
----------------------------------------------------------------------
.../org/apache/camel/processor/Throttler.java | 9 ++++++--
.../apache/camel/processor/ThrottlerTest.java | 24 ++++++++++++++++----
2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6333fe1f/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 5aee3a9..52989a4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -138,7 +138,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (slot == null) {
slot = new TimeSlot();
}
- if (slot.isFull()) {
+ if (slot.isFull() || !slot.isActive()) {
slot = slot.next();
}
slot.assign();
@@ -175,11 +175,16 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
}
protected boolean isActive() {
- return startTime <= System.currentTimeMillis();
+ long current = System.currentTimeMillis();
+ return startTime <= current && current < (startTime + duration);
}
protected boolean isFull() {
return capacity <= 0;
}
}
+
+ TimeSlot getSlot() {
+ return this.slot;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6333fe1f/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 2e419ba..01e3469 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -84,6 +84,7 @@ public class ThrottlerTest extends ContextTestSupport {
// make sure the same slot is used (3 exchanges per slot)
assertSame(slot, throttler.nextSlot());
assertTrue(slot.isFull());
+ assertTrue(slot.isActive());
TimeSlot next = throttler.nextSlot();
// now we should have a new slot that starts somewhere in the future
@@ -91,6 +92,21 @@ public class ThrottlerTest extends ContextTestSupport {
assertFalse(next.isActive());
}
+ public void testTimeSlotCalculusForPeriod() throws InterruptedException {
+ Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+ throttler.calculateDelay(new DefaultExchange(context));
+
+ TimeSlot slot = throttler.getSlot();
+ assertNotNull(slot);
+ assertSame(slot, throttler.nextSlot());
+
+ // we've only used up two of three slots, but now we introduce a time delay
+ // so to make the slot not valid anymore
+ Thread.sleep((long) (1.5 * 1000));
+ assertFalse(slot.isActive());
+ assertNotSame(slot, throttler.nextSlot());
+ }
+
public void testConfigurationWithConstantExpression() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(messageCount);
@@ -142,18 +158,18 @@ public class ThrottlerTest extends ContextTestSupport {
assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
executor.shutdownNow();
}
-
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
// END SNIPPET: ex
-
+
from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-
+
from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-
+
from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
}
};