You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/21 10:39:16 UTC

camel git commit: CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks

Repository: camel
Updated Branches:
  refs/heads/master fe80773ee -> 9ab0bd5c6


CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ab0bd5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ab0bd5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ab0bd5c

Branch: refs/heads/master
Commit: 9ab0bd5c677983d7bdec945e76ee6739640c7ff0
Parents: fe80773
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Dec 21 10:39:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Dec 21 10:39:08 2014 +0100

----------------------------------------------------------------------
 .../ManagedAsyncProcessorAwaitManagerMBean.java |  27 +++++
 .../impl/DefaultAsyncProcessorAwaitManager.java | 102 ++++++++++++++++++-
 .../ManagedAsyncProcessorAwaitManager.java      |  45 ++++++++
 .../camel/spi/AsyncProcessorAwaitManager.java   |  61 +++++++++++
 ...AsyncProcessorAwaitManagerInterruptTest.java |   4 +
 .../async/AsyncProcessorAwaitManagerTest.java   |   4 +
 6 files changed, 242 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
index bb5b669..8683b38 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
@@ -38,4 +38,31 @@ public interface ManagedAsyncProcessorAwaitManagerMBean extends ManagedServiceMB
     @ManagedOperation(description = "To interrupt an exchange which may seem as stuck, to force the exchange to continue, allowing any blocking thread to be released.")
     void interrupt(String exchangeId);
 
+    @ManagedAttribute(description = "Number of threads that has been blocked")
+    long getThreadsBlocked();
+
+    @ManagedAttribute(description = "Number of threads that has been interrupted")
+    long getThreadsInterrupted();
+
+    @ManagedAttribute(description = "Total wait time in msec.")
+    long getTotalDuration();
+
+    @ManagedAttribute(description = "The minimum wait time in msec.")
+    long getMinDuration();
+
+    @ManagedAttribute(description = "The maximum wait time in msec.")
+    long getMaxDuration();
+
+    @ManagedAttribute(description = "The average wait time in msec.")
+    long getMeanDuration();
+
+    @ManagedOperation(description = "Resets the statistics")
+    void resetStatistics();
+
+    @ManagedAttribute(description = "Utilization statistics enabled")
+    boolean isStatisticsEnabled();
+
+    @ManagedAttribute(description = "Utilization statistics enabled")
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 20c2927..2712178 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
@@ -39,6 +40,14 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
 
+    private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
+    private final AtomicLong blockedCounter = new AtomicLong();
+    private final AtomicLong interruptedCounter = new AtomicLong();
+    private final AtomicLong totalDuration = new AtomicLong();
+    private final AtomicLong minDuration = new AtomicLong();
+    private final AtomicLong maxDuration = new AtomicLong();
+    private final AtomicLong meanDuration = new AtomicLong();
+
     private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
     private final ExchangeFormatter exchangeFormatter;
     private boolean interruptThreadsWhileStopping = true;
@@ -58,6 +67,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
+            if (statistics.isStatisticsEnabled()) {
+                blockedCounter.incrementAndGet();
+            }
             inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
             latch.await();
             LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
@@ -68,7 +80,24 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
                     exchange.getExchangeId(), exchange);
             exchange.setException(e);
         } finally {
-            inflight.remove(exchange);
+            AwaitThread thread = inflight.remove(exchange);
+
+            if (statistics.isStatisticsEnabled() && thread != null) {
+                long time = thread.getWaitDuration();
+                long total = totalDuration.get() + time;
+                totalDuration.set(total);
+
+                if (time < minDuration.get()) {
+                    minDuration.set(time);
+                } else if (time > maxDuration.get()) {
+                    maxDuration.set(time);
+                }
+
+                // update mean
+                long count = blockedCounter.get();
+                long mean = count > 0 ? total / count : 0;
+                meanDuration.set(mean);
+            }
         }
     }
 
@@ -127,6 +156,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             } finally {
+                if (statistics.isStatisticsEnabled()) {
+                    interruptedCounter.incrementAndGet();
+                }
                 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
                 entry.getLatch().countDown();
             }
@@ -141,6 +173,10 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
     }
 
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
     @Override
     protected void doStart() throws Exception {
         // noop
@@ -258,4 +294,68 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         }
     }
 
+    /**
+     * Represents utilization statistics
+     */
+    private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics {
+
+        private boolean statisticsEnabled;
+
+        @Override
+        public long getThreadsBlocked() {
+            return blockedCounter.get();
+        }
+
+        @Override
+        public long getThreadsInterrupted() {
+            return interruptedCounter.get();
+        }
+
+        @Override
+        public long getTotalDuration() {
+            return totalDuration.get();
+        }
+
+        @Override
+        public long getMinDuration() {
+            return minDuration.get();
+        }
+
+        @Override
+        public long getMaxDuration() {
+            return maxDuration.get();
+        }
+
+        @Override
+        public long getMeanDuration() {
+            return meanDuration.get();
+        }
+
+        @Override
+        public void reset() {
+            blockedCounter.set(0);
+            interruptedCounter.set(0);
+            totalDuration.set(0);
+            minDuration.set(0);
+            maxDuration.set(0);
+            meanDuration.set(0);
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]",
+                    getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
index a4759ef..5659127 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -92,4 +92,49 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
         manager.interrupt(exchangeId);
     }
 
+    @Override
+    public long getThreadsBlocked() {
+        return manager.getStatistics().getThreadsBlocked();
+    }
+
+    @Override
+    public long getThreadsInterrupted() {
+        return manager.getStatistics().getThreadsInterrupted();
+    }
+
+    @Override
+    public long getTotalDuration() {
+        return manager.getStatistics().getTotalDuration();
+    }
+
+    @Override
+    public long getMinDuration() {
+        return manager.getStatistics().getMinDuration();
+    }
+
+    @Override
+    public long getMaxDuration() {
+        return manager.getStatistics().getMaxDuration();
+    }
+
+    @Override
+    public long getMeanDuration() {
+        return manager.getStatistics().getMeanDuration();
+    }
+
+    @Override
+    public void resetStatistics() {
+        manager.getStatistics().reset();
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return manager.getStatistics().isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        manager.getStatistics().setStatisticsEnabled(statisticsEnabled);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 3081449..d4f8bdd 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -33,6 +33,59 @@ import org.apache.camel.StaticService;
 public interface AsyncProcessorAwaitManager extends StaticService {
 
     /**
+     * Utilization statistics of the this manager.
+     */
+    interface Statistics {
+
+        /**
+         * Total number of threads that has been blocked
+         */
+        long getThreadsBlocked();
+
+        /**
+         * Total number of threads that has been forced interrupted
+         */
+        long getThreadsInterrupted();
+
+        /**
+         * The total duration time in millis.
+         */
+        long getTotalDuration();
+
+        /**
+         * The lowest duration time in millis.
+         */
+        long getMinDuration();
+
+        /**
+         * The highest duration time in millis.
+         */
+        long getMaxDuration();
+
+        /**
+         * The average duration time in millis.
+         */
+        long getMeanDuration();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
+
+    /**
      * Information about the thread and exchange that are inflight.
      */
     interface AwaitThread {
@@ -136,4 +189,12 @@ public interface AsyncProcessorAwaitManager extends StaticService {
      * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
      */
     void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+
+    /**
+     * Gets the utilization statistics of this manager
+     *
+     * @return the utilization statistics
+     */
+    Statistics getStatistics();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index 2d97c80..c53aa9e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -32,6 +32,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport {
 
     public void testAsyncAwaitInterrupt() throws Exception {
+        context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -49,6 +51,8 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
         assertMockEndpointsSatisfied();
 
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+        assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+        assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index 8389069..1fc4635 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -30,6 +30,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
 
     public void testAsyncAwait() throws Exception {
+        context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -42,6 +44,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+        assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+        assertEquals(0, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
     }
 
     @Override