You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/11 15:09:11 UTC
[4/5] git commit: CAMEL-6670: Added current throttled number of
exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.
CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c698df6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c698df6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c698df6e
Branch: refs/heads/master
Commit: c698df6e7a679e8e391baa75f2ac63523e740bb6
Parents: 3506c3e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 15:08:28 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 15:08:28 2013 +0200
----------------------------------------------------------------------
.../management/mbean/ManagedThrottlerMBean.java | 3 +
.../management/mbean/ManagedThrottler.java | 4 +
.../camel/processor/DelayProcessorSupport.java | 20 ++
.../org/apache/camel/processor/Throttler.java | 11 +-
.../camel/management/ManagedThrottlerTest.java | 275 ++++++++++++++++++-
5 files changed, 304 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index a0dc3a0..feec600 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -32,4 +32,7 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "Time period in millis")
void setTimePeriodMillis(long timePeriodMillis);
+ @ManagedAttribute(description = "Number of exchanges currently throttled")
+ int getThrottledCount();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index f80dfa1..99a5e95 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -55,4 +55,8 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl
public void setTimePeriodMillis(long timePeriodMillis) {
getThrottler().setTimePeriodMillis(timePeriodMillis);
}
+
+ public int getThrottledCount() {
+ return getThrottler().getDelayedCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ef69759..ff81170 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -43,6 +44,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
private final boolean shutdownExecutorService;
private boolean asyncDelayed;
private boolean callerRunsWhenRejected = true;
+ private final AtomicInteger delayedCount = new AtomicInteger(0);
// TODO: Add option to cancel tasks on shutdown so we can stop fast
@@ -56,6 +58,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
}
public void run() {
+ // we are running now so decrement the counter
+ delayedCount.decrementAndGet();
+
log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
if (!isRunAllowed()) {
exchange.setException(new RejectedExecutionException("Run is not allowed"));
@@ -123,6 +128,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
}
} else {
// asynchronous delay so schedule a process call task
+ // and increment the counter (we decrement the counter when we run the ProcessCall)
+ delayedCount.incrementAndGet();
ProcessCall call = new ProcessCall(exchange, callback);
try {
log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
@@ -131,6 +138,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
// tell Camel routing engine we continue routing asynchronous
return false;
} catch (RejectedExecutionException e) {
+ // we were not allowed to run the ProcessCall, so need to decrement the counter here
+ delayedCount.decrementAndGet();
if (isCallerRunsWhenRejected()) {
if (!isRunAllowed()) {
exchange.setException(new RejectedExecutionException());
@@ -174,6 +183,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
protected abstract long calculateDelay(Exchange exchange);
/**
+ * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit)
+ */
+ public int getDelayedCount() {
+ return delayedCount.get();
+ }
+
+ /**
* Delays the given time before continuing.
* <p/>
* This implementation will block while waiting
@@ -191,9 +207,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
return;
} else {
try {
+ // keep track on delayer counter while we sleep
+ delayedCount.incrementAndGet();
sleep(delay);
} catch (InterruptedException e) {
handleSleepInterruptedException(e, exchange);
+ } finally {
+ delayedCount.decrementAndGet();
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..ae6bc26 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
public class Throttler extends DelayProcessorSupport implements Traceable {
private volatile long maximumRequestsPerPeriod;
private Expression maxRequestsPerPeriodExpression;
- private long timePeriodMillis = 1000;
+ private AtomicLong timePeriodMillis = new AtomicLong(1000);
private volatile TimeSlot slot;
public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
@@ -53,7 +54,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
if (timePeriodMillis <= 0) {
throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
}
- this.timePeriodMillis = timePeriodMillis;
+ this.timePeriodMillis.set(timePeriodMillis);
}
@Override
@@ -81,7 +82,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
}
public long getTimePeriodMillis() {
- return timePeriodMillis;
+ return timePeriodMillis.get();
}
/**
@@ -95,7 +96,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
* Sets the time period during which the maximum number of requests apply
*/
public void setTimePeriodMillis(long timePeriodMillis) {
- this.timePeriodMillis = timePeriodMillis;
+ this.timePeriodMillis.set(timePeriodMillis);
}
// Implementation methods
@@ -151,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable {
protected class TimeSlot {
private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
- private final long duration = Throttler.this.timePeriodMillis;
+ private final long duration = Throttler.this.timePeriodMillis.get();
private final long startTime;
protected TimeSlot() {
http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 2f90dfe..feb3e1c 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -16,14 +16,23 @@
*/
package org.apache.camel.management;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
- * @version
+ * @version
*/
public class ManagedThrottlerTest extends ManagementTestSupport {
@@ -95,15 +104,273 @@ public class ManagedThrottlerTest extends ManagementTestSupport {
assertTrue("Should be around 5 sec now: was " + total, total > 3500);
}
+ public void testThrottleVisableViaJmx() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ // get the object name for the delayer
+ ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+ // use route to get the total time
+ ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+ // reset the counters
+ mbeanServer.invoke(routeName, "reset", null, null);
+
+ getMockEndpoint("mock:end").expectedMessageCount(10);
+
+ NotifyBuilder notifier = new NotifyBuilder(context).
+ from("seda:throttleCount").whenReceived(5).create();
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:throttleCount", "Message " + i);
+ }
+
+ assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+ Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+ // we are expecting this to be > 0
+ assertTrue(throttledMessages.intValue() > 0);
+
+ assertMockEndpointsSatisfied();
+
+ throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+ assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+ Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+ assertEquals(10, completed.longValue());
+
+ }
+
+ public void testThrottleAsyncVisableViaJmx() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ // get the object name for the delayer
+ ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler3\"");
+
+ // use route to get the total time
+ ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route3\"");
+
+ // reset the counters
+ mbeanServer.invoke(routeName, "reset", null, null);
+
+ getMockEndpoint("mock:endAsync").expectedMessageCount(10);
+
+ // we pick '5' because we are right in the middle of the number of messages
+ // that have been and reduces any race conditions to minimal...
+ NotifyBuilder notifier = new NotifyBuilder(context).
+ from("seda:throttleCountAsync").whenReceived(5).create();
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:throttleCountAsync", "Message " + i);
+ }
+
+ assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+ Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+ // we are expecting this to be > 0
+ assertTrue(throttledMessages.intValue() > 0);
+
+ assertMockEndpointsSatisfied();
+
+ throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+ assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+ Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+ assertEquals(10, completed.longValue());
+
+ }
+
+ public void testThrottleAsyncExceptionVisableViaJmx() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ // get the object name for the delayer
+ ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler4\"");
+
+ // use route to get the total time
+ ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route4\"");
+
+ // reset the counters
+ mbeanServer.invoke(routeName, "reset", null, null);
+
+ getMockEndpoint("mock:endAsyncException").expectedMessageCount(10);
+
+ NotifyBuilder notifier = new NotifyBuilder(context).
+ from("seda:throttleCountAsyncException").whenReceived(5).create();
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:throttleCountAsyncException", "Message " + i);
+ }
+
+ assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+ Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+ // we are expecting this to be > 0
+ assertTrue(throttledMessages.intValue() > 0);
+
+ assertMockEndpointsSatisfied();
+
+ // give a sec for exception handling to finish..
+ Thread.sleep(500);
+
+ throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+ assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages);
+
+ // since all exchanges ended w/ exception, they are not completed
+ Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+ assertEquals(0, completed.longValue());
+
+ }
+
+ public void testRejectedExecution() throws Exception {
+ // when delaying async, we can possibly fill up the execution queue
+ //. which would through a RejectedExecutionException.. we need to make
+ // sure that the delayedCount/throttledCount doesn't leak
+
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ // get the object name for the delayer
+ ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+ // use route to get the total time
+ ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+ // reset the counters
+ mbeanServer.invoke(routeName, "reset", null, null);
+
+ MockEndpoint mock = getMockEndpoint("mock:endAsyncReject");
+ // only one message (the first one) should get through because the rest should get delayed
+ mock.expectedMessageCount(1);
+
+ MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint1");
+ exceptionMock.expectedMessageCount(9);
+
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:throttleCountRejectExecution", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ // we shouldn't have ane leaked throttler counts
+ Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+ assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+
+ }
+
+ public void testRejectedExecutionCallerRuns() throws Exception {
+ // when delaying async, we can possibly fill up the execution queue
+ //. which would through a RejectedExecutionException.. we need to make
+ // sure that the delayedCount/throttledCount doesn't leak
+
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ // get the object name for the delayer
+ ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+ // use route to get the total time
+ ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+ // reset the counters
+ mbeanServer.invoke(routeName, "reset", null, null);
+
+ MockEndpoint mock = getMockEndpoint("mock:endAsyncRejectCallerRuns");
+ // only one message (the first one) should get through because the rest should get delayed
+ mock.expectedMessageCount(10);
+
+ MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint");
+ exceptionMock.expectedMessageCount(0);
+
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:throttleCountRejectExecutionCallerRuns", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ // we shouldn't have ane leaked throttler counts
+ Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+ assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
+ final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) {
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ throw new RejectedExecutionException();
+ }
+ };
+
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
- .to("log:foo")
- .throttle(10).id("mythrottler")
- .to("mock:result");
+ .to("log:foo")
+ .throttle(10).id("mythrottler")
+ .to("mock:result");
+
+ from("seda:throttleCount")
+ .throttle(1).timePeriodMillis(250).id("mythrottler2")
+ .to("mock:end");
+
+ from("seda:throttleCountAsync")
+ .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+ .to("mock:endAsync");
+
+ from("seda:throttleCountAsyncException")
+ .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+ .to("mock:endAsyncException")
+ .process(new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new RuntimeException("Fail me");
+ }
+ });
+ from("seda:throttleCountRejectExecutionCallerRuns")
+ .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+ .throttle(1)
+ .timePeriodMillis(250)
+ .asyncDelayed()
+ .executorService(badService)
+ .callerRunsWhenRejected(true)
+ .id("mythrottler5")
+ .to("mock:endAsyncRejectCallerRuns");
+
+ from("seda:throttleCountRejectExecution")
+ .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+ .throttle(1)
+ .timePeriodMillis(250)
+ .asyncDelayed()
+ .executorService(badService)
+ .callerRunsWhenRejected(false)
+ .id("mythrottler6")
+ .to("mock:endAsyncReject");
}
};
}