You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/01/05 09:47:48 UTC

svn commit: r731493 - in /activemq/camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/resources/org/apache/camel/spring/processor/

Author: gertv
Date: Mon Jan  5 00:47:47 2009
New Revision: 731493

URL: http://svn.apache.org/viewvc?rev=731493&view=rev
Log:
Merged revisions 730132,730157,731488 via svnmerge from 
https://svn.eu.apache.org/repos/asf/activemq/camel/trunk

........
  r730132 | gertv | 2008-12-30 14:59:16 +0100 (Tue, 30 Dec 2008) | 1 line
  
  CAMEL-1199: Throttler appears to throttle per thread instead of over all threads
........
  r730157 | gertv | 2008-12-30 16:51:25 +0100 (Tue, 30 Dec 2008) | 1 line
  
  CAMEL-1199: Fixing bug I just introduced
........
  r731488 | gertv | 2009-01-05 09:14:27 +0100 (Mon, 05 Jan 2009) | 1 line
  
  CAMEL-1199: Throttler appears to throttle per thread
........

Modified:
    activemq/camel/branches/camel-1.x/   (props changed)
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan  5 00:47:47 2009
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730154,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Mon Jan  5 00:47:47 2009
@@ -33,8 +33,7 @@
 public class Throttler extends DelayProcessorSupport {
     private long maximumRequestsPerPeriod;
     private long timePeriodMillis;
-    private long startTimeMillis;
-    private long requestCount;
+    private TimeSlot slot;
 
     public Throttler(Processor processor, long maximumRequestsPerPeriod) {
         this(processor, maximumRequestsPerPeriod, 1000);
@@ -76,39 +75,64 @@
         this.timePeriodMillis = timePeriodMillis;
     }
 
-    /**
-     * The number of requests which have taken place so far within this time
-     * period
-     */
-    public long getRequestCount() {
-        return requestCount;
+    // Implementation methods
+    // -----------------------------------------------------------------------
+    protected void delay(Exchange exchange) throws Exception {
+        TimeSlot slot = nextSlot();
+        if (!slot.isActive()) {
+            waitUntil(slot.startTime, exchange);
+        }
     }
-
-    /**
-     * The start time when this current period began
+    
+    /*
+     * Determine what the next available time slot is for handling an Exchange
      */
-    public long getStartTimeMillis() {
-        return startTimeMillis;
+    protected synchronized TimeSlot nextSlot() {
+        if (slot == null) {
+            slot = new TimeSlot();
+        }
+        if (slot.isFull()) {
+            slot = slot.next();
+        }
+        slot.assign();
+        return slot;
     }
+    
+    /*
+     * A time slot is capable of handling a number of exchanges within a certain period of time.
+     */
+    protected class TimeSlot {
+        
+        private long capacity = Throttler.this.maximumRequestsPerPeriod;
+        private final long duration = Throttler.this.timePeriodMillis;
+        private final long startTime;
 
-    // Implementation methods
-    // -----------------------------------------------------------------------
-    protected void delay(Exchange exchange) throws Exception {
-        long now = currentSystemTime();
-        if (startTimeMillis == 0) {
-            startTimeMillis = now;
-        }
-        if (now - startTimeMillis > timePeriodMillis) {
-            // we're at the start of a new time period
-            // so lets reset things
-            requestCount = 1;
-            startTimeMillis = now;
-        } else {
-            if (++requestCount > maximumRequestsPerPeriod) {
-                // lets sleep until the start of the next time period
-                long time = startTimeMillis + timePeriodMillis;
-                waitUntil(time, exchange);
-            }
+        protected TimeSlot() {
+            this(System.currentTimeMillis());
+        }
+
+        protected TimeSlot(long startTime) {
+            this.startTime = startTime;
+        }
+        
+        protected void assign() {
+            capacity--;
+        }
+        
+        /*
+         * Start the next time slot either now or in the future
+         * (no time slots are being created in the past)
+         */
+        protected TimeSlot next() {
+            return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
+        }
+        
+        protected boolean isActive() {
+            return startTime <= System.currentTimeMillis();
         }
+        
+        protected boolean isFull() {
+            return capacity <= 0;
+        }        
     }
 }

Modified: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Mon Jan  5 00:47:47 2009
@@ -16,14 +16,19 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.Throttler.TimeSlot;
 
 /**
  * @version $Revision$
  */
 public class ThrottlerTest extends ContextTestSupport {
+    private static final int INTERVAL = 500;
     protected int messageCount = 6;
 
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
@@ -39,14 +44,53 @@
         // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
+    
+    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
+        long start = System.currentTimeMillis();
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(messageCount);
+
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    template.sendBody("direct:a", "<message>payload</message>");
+                }                
+            });
+        }
+        
+        // let's wait for the exchanges to arrive
+        resultEndpoint.assertIsSatisfied();
+        
+        // now assert that they have actually been throttled
+        long minimumTime = (messageCount - 1) * INTERVAL;
+        assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime);
+    }
+    
+    public void testTimeSlotCalculus() throws Exception {
+        Throttler throttler = new Throttler(null, 2, 1000);
+        TimeSlot slot = throttler.nextSlot();
+        // start a new time slot
+        assertNotNull(slot);
+        // make sure the same slot is used (2 exchanges per slot)
+        assertSame(slot, throttler.nextSlot());
+        assertTrue(slot.isFull());
+        
+        TimeSlot next = throttler.nextSlot();
+        // now we should have a new slot that starts somewhere in the future
+        assertNotSame(slot, next);
+        assertFalse(next.isActive());
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                from("seda:a").throttler(3).timePeriodMillis(30000).to("mock:result");
+                from("seda:a").throttler(3).timePeriodMillis(10000).to("mock:result");
                 // END SNIPPET: ex
+                
+                from("direct:a").throttler(1).timePeriodMillis(INTERVAL).to("mock:result");
             }
         };
     }
-}
\ No newline at end of file
+}

Modified: activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml (original)
+++ activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml Mon Jan  5 00:47:47 2009
@@ -31,6 +31,12 @@
         <to uri="mock:result" />
       </throttler>
     </route>
+    <route>
+      <from uri="direct:a" />
+      <throttler maximumRequestsPerPeriod="1" timePeriodMillis="500">
+        <to uri="mock:result" />
+      </throttler>
+    </route>
   </camelContext>
   <!-- END SNIPPET: example -->