You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/17 12:00:19 UTC

[3/3] git commit: CAMEL-6678: Fixed Throttler does not honor time slots after period expires. Thanks to Christian Posta for the patch.

CAMEL-6678: Fixed Throttler does not honor time slots after period expires. Thanks to Christian Posta for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6333fe1f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6333fe1f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6333fe1f

Branch: refs/heads/camel-2.11.x
Commit: 6333fe1f73f961993aabfdc64ffbf15a48f1dbdc
Parents: c8da7e4
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 17 11:58:56 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 17 12:00:02 2013 +0200

----------------------------------------------------------------------
 .../org/apache/camel/processor/Throttler.java   |  9 ++++++--
 .../apache/camel/processor/ThrottlerTest.java   | 24 ++++++++++++++++----
 2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6333fe1f/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 5aee3a9..52989a4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -138,7 +138,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         if (slot == null) {
             slot = new TimeSlot();
         }
-        if (slot.isFull()) {
+        if (slot.isFull() || !slot.isActive()) {
             slot = slot.next();
         }
         slot.assign();
@@ -175,11 +175,16 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         }
         
         protected boolean isActive() {
-            return startTime <= System.currentTimeMillis();
+            long current = System.currentTimeMillis();
+            return startTime <= current && current < (startTime + duration);
         }
         
         protected boolean isFull() {
             return capacity <= 0;
         }        
     }
+
+    TimeSlot getSlot() {
+        return this.slot;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6333fe1f/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 2e419ba..01e3469 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -84,6 +84,7 @@ public class ThrottlerTest extends ContextTestSupport {
         // make sure the same slot is used (3 exchanges per slot)
         assertSame(slot, throttler.nextSlot());
         assertTrue(slot.isFull());
+        assertTrue(slot.isActive());
         
         TimeSlot next = throttler.nextSlot();
         // now we should have a new slot that starts somewhere in the future
@@ -91,6 +92,21 @@ public class ThrottlerTest extends ContextTestSupport {
         assertFalse(next.isActive());
     }
 
+    public void testTimeSlotCalculusForPeriod() throws InterruptedException {
+        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
+        throttler.calculateDelay(new DefaultExchange(context));
+
+        TimeSlot slot = throttler.getSlot();
+        assertNotNull(slot);
+        assertSame(slot, throttler.nextSlot());
+
+        // we've only used up two of three slots, but now we introduce a time delay
+        // so to make the slot not valid anymore
+        Thread.sleep((long) (1.5 * 1000));
+        assertFalse(slot.isActive());
+        assertNotSame(slot, throttler.nextSlot());
+    }
+
     public void testConfigurationWithConstantExpression() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(messageCount);
@@ -142,18 +158,18 @@ public class ThrottlerTest extends ContextTestSupport {
         assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
         executor.shutdownNow();
     }
-    
+
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
                 from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
                 // END SNIPPET: ex
-                
+
                 from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-                
+
                 from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-                
+
                 from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
             }
         };