You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 14:55:06 UTC

[3/3] git commit: CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch.

CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04f0d8d7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04f0d8d7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04f0d8d7

Branch: refs/heads/camel-2.11.x
Commit: 04f0d8d7f5c8a1494eba1b311d4fbf45b125d16e
Parents: 9fe42d1
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Feb 3 14:55:23 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 3 14:55:55 2014 +0100

----------------------------------------------------------------------
 .../org/apache/camel/processor/Throttler.java   | 28 +++++++-----
 .../apache/camel/processor/ThrottlerTest.java   | 46 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..3ecc24f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -34,8 +34,8 @@ import org.apache.camel.util.ObjectHelper;
  * as only allowing 100 requests per second; or if huge load can cause a
  * particular system to malfunction or to reduce its throughput you might want
  * to introduce some throttling.
- * 
- * @version 
+ *
+ * @version
  */
 public class Throttler extends DelayProcessorSupport implements Traceable {
     private volatile long maximumRequestsPerPeriod;
@@ -79,7 +79,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     public Expression getMaximumRequestsPerPeriodExpression() {
         return maxRequestsPerPeriodExpression;
     }
-    
+
     public long getTimePeriodMillis() {
         return timePeriodMillis;
     }
@@ -115,6 +115,9 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
             if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
                 log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue);
             }
+            if (maximumRequestsPerPeriod > longValue) {
+                slot.capacity = 0;
+            }
             maximumRequestsPerPeriod = longValue;
         }
 
@@ -130,7 +133,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
             return 0;
         }
     }
-    
+
     /*
      * Determine what the next available time slot is for handling an Exchange
      */
@@ -138,7 +141,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         if (slot == null) {
             slot = new TimeSlot();
         }
-        if (slot.isFull() || !slot.isActive()) {
+        if (slot.isFull() || !slot.isPast()) {
             slot = slot.next();
         }
         slot.assign();
@@ -149,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     * A time slot is capable of handling a number of exchanges within a certain period of time.
     */
     protected class TimeSlot {
-        
+
         private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
         private final long duration = Throttler.this.timePeriodMillis;
         private final long startTime;
@@ -165,7 +168,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         protected void assign() {
             capacity--;
         }
-        
+
         /*
          * Start the next time slot either now or in the future
          * (no time slots are being created in the past)
@@ -173,15 +176,20 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         protected TimeSlot next() {
             return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
         }
-        
+
+        protected boolean isPast() {
+            long current = System.currentTimeMillis();
+            return current < (startTime + duration);
+        }
+
         protected boolean isActive() {
             long current = System.currentTimeMillis();
             return startTime <= current && current < (startTime + duration);
         }
-        
+
         protected boolean isFull() {
             return capacity <= 0;
-        }        
+        }
     }
 
     TimeSlot getSlot() {

http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index 01e3469..29fdc45 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -28,7 +28,7 @@ import org.apache.camel.processor.Throttler.TimeSlot;
 import static org.apache.camel.builder.Builder.constant;
 
 /**
- * @version 
+ * @version
  */
 public class ThrottlerTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
@@ -47,7 +47,7 @@ public class ThrottlerTest extends ContextTestSupport {
         // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
-    
+
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(messageCount);
@@ -85,7 +85,7 @@ public class ThrottlerTest extends ContextTestSupport {
         assertSame(slot, throttler.nextSlot());
         assertTrue(slot.isFull());
         assertTrue(slot.isActive());
-        
+
         TimeSlot next = throttler.nextSlot();
         // now we should have a new slot that starts somewhere in the future
         assertNotSame(slot, next);
@@ -159,6 +159,46 @@ public class ThrottlerTest extends ContextTestSupport {
         executor.shutdownNow();
     }
 
+    public void testConfigurationWithChangingHeaderExpression() throws Exception {
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+        resultEndpoint.reset();
+        sendMessagesWithHeaderExpression(executor, resultEndpoint, 10);
+
+        resultEndpoint.reset();
+        sendMessagesWithHeaderExpression(executor, resultEndpoint, 1);
+
+        executor.shutdownNow();
+    }
+
+    private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int
+            throttle) throws InterruptedException {
+        resultEndpoint.expectedMessageCount(messageCount);
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue",
+                            throttle);
+                }
+            });
+        }
+
+        // let's wait for the exchanges to arrive
+        resultEndpoint.assertIsSatisfied();
+
+        // now assert that they have actually been throttled
+        long minimumTime = (messageCount - 1) * INTERVAL / throttle;
+        // add a little slack
+        long delta = System.currentTimeMillis() - start + 200;
+        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
+        long maxTime = (messageCount - 1) * INTERVAL / throttle * 3;
+        assertTrue("Should take at most " + maxTime + "ms, was: " + delta, delta <= maxTime);
+    }
+
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {