You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/21 10:39:16 UTC
camel git commit: CAMEL-8165: Async routing engine - Add insight into
threads blocked waiting for callbacks
Repository: camel
Updated Branches:
refs/heads/master fe80773ee -> 9ab0bd5c6
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ab0bd5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ab0bd5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ab0bd5c
Branch: refs/heads/master
Commit: 9ab0bd5c677983d7bdec945e76ee6739640c7ff0
Parents: fe80773
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Dec 21 10:39:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Dec 21 10:39:08 2014 +0100
----------------------------------------------------------------------
.../ManagedAsyncProcessorAwaitManagerMBean.java | 27 +++++
.../impl/DefaultAsyncProcessorAwaitManager.java | 102 ++++++++++++++++++-
.../ManagedAsyncProcessorAwaitManager.java | 45 ++++++++
.../camel/spi/AsyncProcessorAwaitManager.java | 61 +++++++++++
...AsyncProcessorAwaitManagerInterruptTest.java | 4 +
.../async/AsyncProcessorAwaitManagerTest.java | 4 +
6 files changed, 242 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
index bb5b669..8683b38 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
@@ -38,4 +38,31 @@ public interface ManagedAsyncProcessorAwaitManagerMBean extends ManagedServiceMB
@ManagedOperation(description = "To interrupt an exchange which may seem as stuck, to force the exchange to continue, allowing any blocking thread to be released.")
void interrupt(String exchangeId);
+ @ManagedAttribute(description = "Number of threads that has been blocked")
+ long getThreadsBlocked();
+
+ @ManagedAttribute(description = "Number of threads that has been interrupted")
+ long getThreadsInterrupted();
+
+ @ManagedAttribute(description = "Total wait time in msec.")
+ long getTotalDuration();
+
+ @ManagedAttribute(description = "The minimum wait time in msec.")
+ long getMinDuration();
+
+ @ManagedAttribute(description = "The maximum wait time in msec.")
+ long getMaxDuration();
+
+ @ManagedAttribute(description = "The average wait time in msec.")
+ long getMeanDuration();
+
+ @ManagedOperation(description = "Resets the statistics")
+ void resetStatistics();
+
+ @ManagedAttribute(description = "Utilization statistics enabled")
+ boolean isStatisticsEnabled();
+
+ @ManagedAttribute(description = "Utilization statistics enabled")
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 20c2927..2712178 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.MessageHistory;
@@ -39,6 +40,14 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
+ private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
+ private final AtomicLong blockedCounter = new AtomicLong();
+ private final AtomicLong interruptedCounter = new AtomicLong();
+ private final AtomicLong totalDuration = new AtomicLong();
+ private final AtomicLong minDuration = new AtomicLong();
+ private final AtomicLong maxDuration = new AtomicLong();
+ private final AtomicLong meanDuration = new AtomicLong();
+
private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
private final ExchangeFormatter exchangeFormatter;
private boolean interruptThreadsWhileStopping = true;
@@ -58,6 +67,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
try {
+ if (statistics.isStatisticsEnabled()) {
+ blockedCounter.incrementAndGet();
+ }
inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
latch.await();
LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
@@ -68,7 +80,24 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
exchange.getExchangeId(), exchange);
exchange.setException(e);
} finally {
- inflight.remove(exchange);
+ AwaitThread thread = inflight.remove(exchange);
+
+ if (statistics.isStatisticsEnabled() && thread != null) {
+ long time = thread.getWaitDuration();
+ long total = totalDuration.get() + time;
+ totalDuration.set(total);
+
+ if (time < minDuration.get()) {
+ minDuration.set(time);
+ } else if (time > maxDuration.get()) {
+ maxDuration.set(time);
+ }
+
+ // update mean
+ long count = blockedCounter.get();
+ long mean = count > 0 ? total / count : 0;
+ meanDuration.set(mean);
+ }
}
}
@@ -127,6 +156,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
+ if (statistics.isStatisticsEnabled()) {
+ interruptedCounter.incrementAndGet();
+ }
exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
entry.getLatch().countDown();
}
@@ -141,6 +173,10 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
}
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
@Override
protected void doStart() throws Exception {
// noop
@@ -258,4 +294,68 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
}
}
+ /**
+ * Represents utilization statistics
+ */
+ private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics {
+
+ private boolean statisticsEnabled;
+
+ @Override
+ public long getThreadsBlocked() {
+ return blockedCounter.get();
+ }
+
+ @Override
+ public long getThreadsInterrupted() {
+ return interruptedCounter.get();
+ }
+
+ @Override
+ public long getTotalDuration() {
+ return totalDuration.get();
+ }
+
+ @Override
+ public long getMinDuration() {
+ return minDuration.get();
+ }
+
+ @Override
+ public long getMaxDuration() {
+ return maxDuration.get();
+ }
+
+ @Override
+ public long getMeanDuration() {
+ return meanDuration.get();
+ }
+
+ @Override
+ public void reset() {
+ blockedCounter.set(0);
+ interruptedCounter.set(0);
+ totalDuration.set(0);
+ minDuration.set(0);
+ maxDuration.set(0);
+ meanDuration.set(0);
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]",
+ getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
index a4759ef..5659127 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -92,4 +92,49 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
manager.interrupt(exchangeId);
}
+ @Override
+ public long getThreadsBlocked() {
+ return manager.getStatistics().getThreadsBlocked();
+ }
+
+ @Override
+ public long getThreadsInterrupted() {
+ return manager.getStatistics().getThreadsInterrupted();
+ }
+
+ @Override
+ public long getTotalDuration() {
+ return manager.getStatistics().getTotalDuration();
+ }
+
+ @Override
+ public long getMinDuration() {
+ return manager.getStatistics().getMinDuration();
+ }
+
+ @Override
+ public long getMaxDuration() {
+ return manager.getStatistics().getMaxDuration();
+ }
+
+ @Override
+ public long getMeanDuration() {
+ return manager.getStatistics().getMeanDuration();
+ }
+
+ @Override
+ public void resetStatistics() {
+ manager.getStatistics().reset();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return manager.getStatistics().isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ manager.getStatistics().setStatisticsEnabled(statisticsEnabled);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 3081449..d4f8bdd 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -33,6 +33,59 @@ import org.apache.camel.StaticService;
public interface AsyncProcessorAwaitManager extends StaticService {
/**
+ * Utilization statistics of the this manager.
+ */
+ interface Statistics {
+
+ /**
+ * Total number of threads that has been blocked
+ */
+ long getThreadsBlocked();
+
+ /**
+ * Total number of threads that has been forced interrupted
+ */
+ long getThreadsInterrupted();
+
+ /**
+ * The total duration time in millis.
+ */
+ long getTotalDuration();
+
+ /**
+ * The lowest duration time in millis.
+ */
+ long getMinDuration();
+
+ /**
+ * The highest duration time in millis.
+ */
+ long getMaxDuration();
+
+ /**
+ * The average duration time in millis.
+ */
+ long getMeanDuration();
+
+ /**
+ * Reset the counters
+ */
+ void reset();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Sets whether statistics is enabled.
+ *
+ * @param statisticsEnabled <tt>true</tt> to enable
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+ }
+
+ /**
* Information about the thread and exchange that are inflight.
*/
interface AwaitThread {
@@ -136,4 +189,12 @@ public interface AsyncProcessorAwaitManager extends StaticService {
* This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
*/
void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+
+ /**
+ * Gets the utilization statistics of this manager
+ *
+ * @return the utilization statistics
+ */
+ Statistics getStatistics();
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index 2d97c80..c53aa9e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -32,6 +32,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport {
public void testAsyncAwaitInterrupt() throws Exception {
+ context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
assertEquals(0, context.getAsyncProcessorAwaitManager().size());
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -49,6 +51,8 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
assertMockEndpointsSatisfied();
assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+ assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+ assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index 8389069..1fc4635 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -30,6 +30,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
public void testAsyncAwait() throws Exception {
+ context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
assertEquals(0, context.getAsyncProcessorAwaitManager().size());
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -42,6 +44,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+ assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+ assertEquals(0, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
}
@Override