You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/01/05 09:47:48 UTC
svn commit: r731493 - in /activemq/camel/branches/camel-1.x: ./
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/processor/
components/camel-spring/src/test/resources/org/apache/camel/spring/processor/
Author: gertv
Date: Mon Jan 5 00:47:47 2009
New Revision: 731493
URL: http://svn.apache.org/viewvc?rev=731493&view=rev
Log:
Merged revisions 730132,730157,731488 via svnmerge from
https://svn.eu.apache.org/repos/asf/activemq/camel/trunk
........
r730132 | gertv | 2008-12-30 14:59:16 +0100 (Tue, 30 Dec 2008) | 1 line
CAMEL-1199: Throttler appears to throttle per thread instead of over all threads
........
r730157 | gertv | 2008-12-30 16:51:25 +0100 (Tue, 30 Dec 2008) | 1 line
CAMEL-1199: Fixing bug I just introduced
........
r731488 | gertv | 2009-01-05 09:14:27 +0100 (Mon, 05 Jan 2009) | 1 line
CAMEL-1199: Throttler appears to throttle per thread
........
Modified:
activemq/camel/branches/camel-1.x/ (props changed)
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 5 00:47:47 2009
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730154,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Mon Jan 5 00:47:47 2009
@@ -33,8 +33,7 @@
public class Throttler extends DelayProcessorSupport {
private long maximumRequestsPerPeriod;
private long timePeriodMillis;
- private long startTimeMillis;
- private long requestCount;
+ private TimeSlot slot;
public Throttler(Processor processor, long maximumRequestsPerPeriod) {
this(processor, maximumRequestsPerPeriod, 1000);
@@ -76,39 +75,64 @@
this.timePeriodMillis = timePeriodMillis;
}
- /**
- * The number of requests which have taken place so far within this time
- * period
- */
- public long getRequestCount() {
- return requestCount;
+ // Implementation methods
+ // -----------------------------------------------------------------------
+ protected void delay(Exchange exchange) throws Exception {
+ TimeSlot slot = nextSlot();
+ if (!slot.isActive()) {
+ waitUntil(slot.startTime, exchange);
+ }
}
-
- /**
- * The start time when this current period began
+
+ /*
+ * Determine what the next available time slot is for handling an Exchange
*/
- public long getStartTimeMillis() {
- return startTimeMillis;
+ protected synchronized TimeSlot nextSlot() {
+ if (slot == null) {
+ slot = new TimeSlot();
+ }
+ if (slot.isFull()) {
+ slot = slot.next();
+ }
+ slot.assign();
+ return slot;
}
+
+ /*
+ * A time slot is capable of handling a number of exchanges within a certain period of time.
+ */
+ protected class TimeSlot {
+
+ private long capacity = Throttler.this.maximumRequestsPerPeriod;
+ private final long duration = Throttler.this.timePeriodMillis;
+ private final long startTime;
- // Implementation methods
- // -----------------------------------------------------------------------
- protected void delay(Exchange exchange) throws Exception {
- long now = currentSystemTime();
- if (startTimeMillis == 0) {
- startTimeMillis = now;
- }
- if (now - startTimeMillis > timePeriodMillis) {
- // we're at the start of a new time period
- // so lets reset things
- requestCount = 1;
- startTimeMillis = now;
- } else {
- if (++requestCount > maximumRequestsPerPeriod) {
- // lets sleep until the start of the next time period
- long time = startTimeMillis + timePeriodMillis;
- waitUntil(time, exchange);
- }
+ protected TimeSlot() {
+ this(System.currentTimeMillis());
+ }
+
+ protected TimeSlot(long startTime) {
+ this.startTime = startTime;
+ }
+
+ protected void assign() {
+ capacity--;
+ }
+
+ /*
+ * Start the next time slot either now or in the future
+ * (no time slots are being created in the past)
+ */
+ protected TimeSlot next() {
+ return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
+ }
+
+ protected boolean isActive() {
+ return startTime <= System.currentTimeMillis();
}
+
+ protected boolean isFull() {
+ return capacity <= 0;
+ }
}
}
Modified: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Mon Jan 5 00:47:47 2009
@@ -16,14 +16,19 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.Throttler.TimeSlot;
/**
* @version $Revision$
*/
public class ThrottlerTest extends ContextTestSupport {
+ private static final int INTERVAL = 500;
protected int messageCount = 6;
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
@@ -39,14 +44,53 @@
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
+
+ public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
+ long start = System.currentTimeMillis();
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(messageCount);
+
+ ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ template.sendBody("direct:a", "<message>payload</message>");
+ }
+ });
+ }
+
+ // let's wait for the exchanges to arrive
+ resultEndpoint.assertIsSatisfied();
+
+ // now assert that they have actually been throttled
+ long minimumTime = (messageCount - 1) * INTERVAL;
+ assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime);
+ }
+
+ public void testTimeSlotCalculus() throws Exception {
+ Throttler throttler = new Throttler(null, 2, 1000);
+ TimeSlot slot = throttler.nextSlot();
+ // start a new time slot
+ assertNotNull(slot);
+ // make sure the same slot is used (2 exchanges per slot)
+ assertSame(slot, throttler.nextSlot());
+ assertTrue(slot.isFull());
+
+ TimeSlot next = throttler.nextSlot();
+ // now we should have a new slot that starts somewhere in the future
+ assertNotSame(slot, next);
+ assertFalse(next.isActive());
+ }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
- from("seda:a").throttler(3).timePeriodMillis(30000).to("mock:result");
+ from("seda:a").throttler(3).timePeriodMillis(10000).to("mock:result");
// END SNIPPET: ex
+
+ from("direct:a").throttler(1).timePeriodMillis(INTERVAL).to("mock:result");
}
};
}
-}
\ No newline at end of file
+}
Modified: activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml?rev=731493&r1=731492&r2=731493&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml (original)
+++ activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml Mon Jan 5 00:47:47 2009
@@ -31,6 +31,12 @@
<to uri="mock:result" />
</throttler>
</route>
+ <route>
+ <from uri="direct:a" />
+ <throttler maximumRequestsPerPeriod="1" timePeriodMillis="500">
+ <to uri="mock:result" />
+ </throttler>
+ </route>
</camelContext>
<!-- END SNIPPET: example -->