You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/11 15:09:08 UTC

[1/5] git commit: CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.

Updated Branches:
  refs/heads/camel-2.11.x c7e1f3ebc -> 4d2d91e30
  refs/heads/camel-2.12.x 8257f46be -> e8288db96
  refs/heads/master a562c867e -> c698df6e7


CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3506c3ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3506c3ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3506c3ef

Branch: refs/heads/master
Commit: 3506c3ef5a5d5360245315bbc2ae2b8962705c04
Parents: a562c86
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 13:44:48 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 13:44:48 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/util/UnsafeUriCharactersEncoder.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3506c3ef/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
index 4bc707f..0a72c8a 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
@@ -26,7 +26,7 @@ import java.util.BitSet;
 public final class UnsafeUriCharactersEncoder {
     private static BitSet unsafeCharacters;   
     private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C',
-                                              'D', 'E', 'F'};
+                                              'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f'};
 
     static {
         unsafeCharacters = new BitSet(256);


[2/5] git commit: CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.

Posted by da...@apache.org.
CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a56cbd2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a56cbd2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a56cbd2

Branch: refs/heads/camel-2.12.x
Commit: 3a56cbd269c84628e85c8112cf3d8ce71c059809
Parents: 8257f46
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 13:44:48 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 13:45:08 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/util/UnsafeUriCharactersEncoder.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3a56cbd2/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
index 4bc707f..0a72c8a 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
@@ -26,7 +26,7 @@ import java.util.BitSet;
 public final class UnsafeUriCharactersEncoder {
     private static BitSet unsafeCharacters;   
     private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C',
-                                              'D', 'E', 'F'};
+                                              'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f'};
 
     static {
         unsafeCharacters = new BitSet(256);


[4/5] git commit: CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.

Posted by da...@apache.org.
CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c698df6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c698df6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c698df6e

Branch: refs/heads/master
Commit: c698df6e7a679e8e391baa75f2ac63523e740bb6
Parents: 3506c3e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 15:08:28 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 15:08:28 2013 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedThrottlerMBean.java |   3 +
 .../management/mbean/ManagedThrottler.java      |   4 +
 .../camel/processor/DelayProcessorSupport.java  |  20 ++
 .../org/apache/camel/processor/Throttler.java   |  11 +-
 .../camel/management/ManagedThrottlerTest.java  | 275 ++++++++++++++++++-
 5 files changed, 304 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index a0dc3a0..feec600 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -32,4 +32,7 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Time period in millis")
     void setTimePeriodMillis(long timePeriodMillis);
 
+    @ManagedAttribute(description = "Number of exchanges currently throttled")
+    int getThrottledCount();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index f80dfa1..99a5e95 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -55,4 +55,8 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl
     public void setTimePeriodMillis(long timePeriodMillis) {
         getThrottler().setTimePeriodMillis(timePeriodMillis);
     }
+
+    public int getThrottledCount() {
+        return getThrottler().getDelayedCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ef69759..ff81170 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -43,6 +44,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     private final boolean shutdownExecutorService;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private final AtomicInteger delayedCount = new AtomicInteger(0);
 
     // TODO: Add option to cancel tasks on shutdown so we can stop fast
 
@@ -56,6 +58,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
         }
 
         public void run() {
+            // we are running now so decrement the counter
+            delayedCount.decrementAndGet();
+
             log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
             if (!isRunAllowed()) {
                 exchange.setException(new RejectedExecutionException("Run is not allowed"));
@@ -123,6 +128,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
             }
         } else {
             // asynchronous delay so schedule a process call task
+            // and increment the counter (we decrement the counter when we run the ProcessCall)
+            delayedCount.incrementAndGet();
             ProcessCall call = new ProcessCall(exchange, callback);
             try {
                 log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
@@ -131,6 +138,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
                 // tell Camel routing engine we continue routing asynchronous
                 return false;
             } catch (RejectedExecutionException e) {
+                // we were not allowed to run the ProcessCall, so need to decrement the counter here
+                delayedCount.decrementAndGet();
                 if (isCallerRunsWhenRejected()) {
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
@@ -174,6 +183,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected abstract long calculateDelay(Exchange exchange);
 
     /**
+     * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit)
+     */
+    public int getDelayedCount() {
+        return delayedCount.get();
+    }
+
+    /**
      * Delays the given time before continuing.
      * <p/>
      * This implementation will block while waiting
@@ -191,9 +207,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
             return;
         } else {
             try {
+                // keep track on delayer counter while we sleep
+                delayedCount.incrementAndGet();
                 sleep(delay);
             } catch (InterruptedException e) {
                 handleSleepInterruptedException(e, exchange);
+            } finally {
+                delayedCount.decrementAndGet();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..ae6bc26 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
 public class Throttler extends DelayProcessorSupport implements Traceable {
     private volatile long maximumRequestsPerPeriod;
     private Expression maxRequestsPerPeriodExpression;
-    private long timePeriodMillis = 1000;
+    private AtomicLong timePeriodMillis = new AtomicLong(1000);
     private volatile TimeSlot slot;
 
     public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
@@ -53,7 +54,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         if (timePeriodMillis <= 0) {
             throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
         }
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     @Override
@@ -81,7 +82,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     }
     
     public long getTimePeriodMillis() {
-        return timePeriodMillis;
+        return timePeriodMillis.get();
     }
 
     /**
@@ -95,7 +96,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
      * Sets the time period during which the maximum number of requests apply
      */
     public void setTimePeriodMillis(long timePeriodMillis) {
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     // Implementation methods
@@ -151,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     protected class TimeSlot {
         
         private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
-        private final long duration = Throttler.this.timePeriodMillis;
+        private final long duration = Throttler.this.timePeriodMillis.get();
         private final long startTime;
 
         protected TimeSlot() {

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 2f90dfe..feb3e1c 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -16,14 +16,23 @@
  */
 package org.apache.camel.management;
 
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * @version 
+ * @version
  */
 public class ManagedThrottlerTest extends ManagementTestSupport {
 
@@ -95,15 +104,273 @@ public class ManagedThrottlerTest extends ManagementTestSupport {
         assertTrue("Should be around 5 sec now: was " + total, total > 3500);
     }
 
+    public void testThrottleVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:end").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCount").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCount", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler3\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route3\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsync").expectedMessageCount(10);
+
+        // we pick '5' because we are right in the middle of the number of messages
+        // that have been and reduces any race conditions to minimal...
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsync").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsync", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncExceptionVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler4\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route4\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsyncException").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsyncException").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsyncException", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        // give a sec for exception handling to finish..
+        Thread.sleep(500);
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+        // since all exchanges ended w/ exception, they are not completed
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(0, completed.longValue());
+
+    }
+
+    public void testRejectedExecution() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncReject");
+        // only one message (the first one) should get through because the rest should get delayed
+        mock.expectedMessageCount(1);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint1");
+        exceptionMock.expectedMessageCount(9);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecution", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+    }
+
+    public void testRejectedExecutionCallerRuns() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncRejectCallerRuns");
+        // only one message (the first one) should get through because the rest should get delayed
+        mock.expectedMessageCount(10);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint");
+        exceptionMock.expectedMessageCount(0);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecutionCallerRuns", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
+        final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) {
+            @Override
+            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+                throw new RejectedExecutionException();
+            }
+        };
+
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("log:foo")
-                    .throttle(10).id("mythrottler")
-                    .to("mock:result");
+                        .to("log:foo")
+                        .throttle(10).id("mythrottler")
+                        .to("mock:result");
+
+                from("seda:throttleCount")
+                        .throttle(1).timePeriodMillis(250).id("mythrottler2")
+                        .to("mock:end");
+
+                from("seda:throttleCountAsync")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+                        .to("mock:endAsync");
+
+                from("seda:throttleCountAsyncException")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+                        .to("mock:endAsyncException")
+                        .process(new Processor() {
+
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                throw new RuntimeException("Fail me");
+                            }
+                        });
+                from("seda:throttleCountRejectExecutionCallerRuns")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(true)
+                            .id("mythrottler5")
+                        .to("mock:endAsyncRejectCallerRuns");
+
+                from("seda:throttleCountRejectExecution")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(false)
+                            .id("mythrottler6")
+                        .to("mock:endAsyncReject");
             }
         };
     }


[3/5] git commit: CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.

Posted by da...@apache.org.
CAMEL-6853: Lower case a..f should be in uri character encorder. Thanks to Thomas Konstantinides for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d2d91e3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d2d91e3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d2d91e3

Branch: refs/heads/camel-2.11.x
Commit: 4d2d91e304fd745926201356da9818337ef1eb8b
Parents: c7e1f3e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 13:44:48 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 13:45:20 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/util/UnsafeUriCharactersEncoder.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4d2d91e3/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
index 4bc707f..0a72c8a 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnsafeUriCharactersEncoder.java
@@ -26,7 +26,7 @@ import java.util.BitSet;
 public final class UnsafeUriCharactersEncoder {
     private static BitSet unsafeCharacters;   
     private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C',
-                                              'D', 'E', 'F'};
+                                              'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f'};
 
     static {
         unsafeCharacters = new BitSet(256);


[5/5] git commit: CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.

Posted by da...@apache.org.
CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8288db9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8288db9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8288db9

Branch: refs/heads/camel-2.12.x
Commit: e8288db967011fdc274c40c656204634c56f3baa
Parents: 3a56cbd
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 15:08:28 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 15:08:50 2013 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedThrottlerMBean.java |   3 +
 .../management/mbean/ManagedThrottler.java      |   4 +
 .../camel/processor/DelayProcessorSupport.java  |  20 ++
 .../org/apache/camel/processor/Throttler.java   |  11 +-
 .../camel/management/ManagedThrottlerTest.java  | 275 ++++++++++++++++++-
 5 files changed, 304 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e8288db9/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index a0dc3a0..feec600 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -32,4 +32,7 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Time period in millis")
     void setTimePeriodMillis(long timePeriodMillis);
 
+    @ManagedAttribute(description = "Number of exchanges currently throttled")
+    int getThrottledCount();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e8288db9/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index f80dfa1..99a5e95 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -55,4 +55,8 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl
     public void setTimePeriodMillis(long timePeriodMillis) {
         getThrottler().setTimePeriodMillis(timePeriodMillis);
     }
+
+    public int getThrottledCount() {
+        return getThrottler().getDelayedCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8288db9/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ef69759..ff81170 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -43,6 +44,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     private final boolean shutdownExecutorService;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private final AtomicInteger delayedCount = new AtomicInteger(0);
 
     // TODO: Add option to cancel tasks on shutdown so we can stop fast
 
@@ -56,6 +58,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
         }
 
         public void run() {
+            // we are running now so decrement the counter
+            delayedCount.decrementAndGet();
+
             log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
             if (!isRunAllowed()) {
                 exchange.setException(new RejectedExecutionException("Run is not allowed"));
@@ -123,6 +128,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
             }
         } else {
             // asynchronous delay so schedule a process call task
+            // and increment the counter (we decrement the counter when we run the ProcessCall)
+            delayedCount.incrementAndGet();
             ProcessCall call = new ProcessCall(exchange, callback);
             try {
                 log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
@@ -131,6 +138,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
                 // tell Camel routing engine we continue routing asynchronous
                 return false;
             } catch (RejectedExecutionException e) {
+                // we were not allowed to run the ProcessCall, so need to decrement the counter here
+                delayedCount.decrementAndGet();
                 if (isCallerRunsWhenRejected()) {
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
@@ -174,6 +183,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected abstract long calculateDelay(Exchange exchange);
 
     /**
+     * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit)
+     */
+    public int getDelayedCount() {
+        return delayedCount.get();
+    }
+
+    /**
      * Delays the given time before continuing.
      * <p/>
      * This implementation will block while waiting
@@ -191,9 +207,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
             return;
         } else {
             try {
+                // keep track on delayer counter while we sleep
+                delayedCount.incrementAndGet();
                 sleep(delay);
             } catch (InterruptedException e) {
                 handleSleepInterruptedException(e, exchange);
+            } finally {
+                delayedCount.decrementAndGet();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8288db9/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..ae6bc26 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
 public class Throttler extends DelayProcessorSupport implements Traceable {
     private volatile long maximumRequestsPerPeriod;
     private Expression maxRequestsPerPeriodExpression;
-    private long timePeriodMillis = 1000;
+    private AtomicLong timePeriodMillis = new AtomicLong(1000);
     private volatile TimeSlot slot;
 
     public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
@@ -53,7 +54,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
         if (timePeriodMillis <= 0) {
             throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
         }
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     @Override
@@ -81,7 +82,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     }
     
     public long getTimePeriodMillis() {
-        return timePeriodMillis;
+        return timePeriodMillis.get();
     }
 
     /**
@@ -95,7 +96,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
      * Sets the time period during which the maximum number of requests apply
      */
     public void setTimePeriodMillis(long timePeriodMillis) {
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     // Implementation methods
@@ -151,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
     protected class TimeSlot {
         
         private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
-        private final long duration = Throttler.this.timePeriodMillis;
+        private final long duration = Throttler.this.timePeriodMillis.get();
         private final long startTime;
 
         protected TimeSlot() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e8288db9/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 2f90dfe..feb3e1c 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -16,14 +16,23 @@
  */
 package org.apache.camel.management;
 
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * @version 
+ * @version
  */
 public class ManagedThrottlerTest extends ManagementTestSupport {
 
@@ -95,15 +104,273 @@ public class ManagedThrottlerTest extends ManagementTestSupport {
         assertTrue("Should be around 5 sec now: was " + total, total > 3500);
     }
 
+    public void testThrottleVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:end").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCount").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCount", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler3\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route3\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsync").expectedMessageCount(10);
+
+        // we pick '5' because we are right in the middle of the number of messages
+        // that have been and reduces any race conditions to minimal...
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsync").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsync", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncExceptionVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler4\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route4\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsyncException").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsyncException").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsyncException", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        // give a sec for exception handling to finish..
+        Thread.sleep(500);
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+        // since all exchanges ended w/ exception, they are not completed
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(0, completed.longValue());
+
+    }
+
+    public void testRejectedExecution() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncReject");
+        // only one message (the first one) should get through because the rest should get delayed
+        mock.expectedMessageCount(1);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint1");
+        exceptionMock.expectedMessageCount(9);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecution", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+    }
+
+    public void testRejectedExecutionCallerRuns() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncRejectCallerRuns");
+        // only one message (the first one) should get through because the rest should get delayed
+        mock.expectedMessageCount(10);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint");
+        exceptionMock.expectedMessageCount(0);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecutionCallerRuns", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
+        final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) {
+            @Override
+            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+                throw new RejectedExecutionException();
+            }
+        };
+
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("log:foo")
-                    .throttle(10).id("mythrottler")
-                    .to("mock:result");
+                        .to("log:foo")
+                        .throttle(10).id("mythrottler")
+                        .to("mock:result");
+
+                from("seda:throttleCount")
+                        .throttle(1).timePeriodMillis(250).id("mythrottler2")
+                        .to("mock:end");
+
+                from("seda:throttleCountAsync")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+                        .to("mock:endAsync");
+
+                from("seda:throttleCountAsyncException")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+                        .to("mock:endAsyncException")
+                        .process(new Processor() {
+
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                throw new RuntimeException("Fail me");
+                            }
+                        });
+                from("seda:throttleCountRejectExecutionCallerRuns")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(true)
+                            .id("mythrottler5")
+                        .to("mock:endAsyncRejectCallerRuns");
+
+                from("seda:throttleCountRejectExecution")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(false)
+                            .id("mythrottler6")
+                        .to("mock:endAsyncReject");
             }
         };
     }