You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/23 19:10:48 UTC

svn commit: r1138996 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/tools/ src/java/org/apache/cassandra/utils/

Author: slebresne
Date: Thu Jun 23 17:10:48 2011
New Revision: 1138996

URL: http://svn.apache.org/viewvc?rev=1138996&view=rev
Log:
Expose number of threads blocked on submitting a memtable for flush
patch by slebresne; reviewed by jbellis for CASSANDRA-2817

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Jun 23 17:10:48 2011
@@ -20,6 +20,8 @@
    supercolumn tombstone during replica resolution (CASSANDRA-2590)
  * use threadsafe collections for StreamInSession and StreamOutSession
    (CASSANDRA-2766, CASSANDRA-2792)
+ * Expose number of threads blocked on submitting memtable to flush
+   (CASSANDRA-2817)
 
 
 0.7.6

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jun 23 17:10:48 2011
@@ -66,15 +66,22 @@ public class DebuggableThreadPoolExecuto
         {
             public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
             {
+                ((DebuggableThreadPoolExecutor)executor).onInitialRejection(task);
                 BlockingQueue<Runnable> queue = executor.getQueue();
                 while (true)
                 {
                     if (executor.isShutdown())
+                    {
+                        ((DebuggableThreadPoolExecutor)executor).onFinalRejection(task);
                         throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
+                    }
                     try
                     {
                         if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                        {
+                            ((DebuggableThreadPoolExecutor)executor).onFinalAccept(task);
                             break;
+                        }
                     }
                     catch (InterruptedException e)
                     {
@@ -85,6 +92,10 @@ public class DebuggableThreadPoolExecuto
         });
     }
 
+    protected void onInitialRejection(Runnable task) {}
+    protected void onFinalAccept(Runnable task) {}
+    protected void onFinalRejection(Runnable task) {}
+
     @Override
     public void afterExecute(Runnable r, Throwable t)
     {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Thu Jun 23 17:10:48 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
 
 import java.lang.management.ManagementFactory;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +37,9 @@ public class JMXEnabledThreadPoolExecuto
 {
     private final String mbeanName;
 
+    private final AtomicInteger totalBlocked = new AtomicInteger(0);
+    private final AtomicInteger currentBlocked = new AtomicInteger(0);
+
     public JMXEnabledThreadPoolExecutor(String threadPoolName)
     {
         this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), "internal");
@@ -129,4 +133,34 @@ public class JMXEnabledThreadPoolExecuto
     {
         return getTaskCount() - getCompletedTaskCount();
     }
+
+    public int getTotalBlockedTasks()
+    {
+        return totalBlocked.get();
+    }
+
+    public int getCurrentlyBlockedTasks()
+    {
+        return currentBlocked.get();
+    }
+
+    @Override
+    protected void onInitialRejection(Runnable task)
+    {
+        totalBlocked.incrementAndGet();
+        currentBlocked.incrementAndGet();
+    }
+
+    @Override
+    protected void onFinalAccept(Runnable task)
+    {
+        currentBlocked.decrementAndGet();
+    }
+
+    @Override
+    protected void onFinalRejection(Runnable task)
+    {
+        currentBlocked.decrementAndGet();
+    }
+
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java Thu Jun 23 17:10:48 2011
@@ -20,4 +20,15 @@ package org.apache.cassandra.concurrent;
 
 public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
 {
-}
\ No newline at end of file
+    /**
+     * Get the number of tasks that had blocked before being accepted (or
+     * rejected).
+     */
+    public int getTotalBlockedTasks();
+
+    /**
+     * Get the number of tasks currently blocked, waiting to be accepted by
+     * the executor (because all threads are busy and the backing queue is full).
+     */
+    public int getCurrentlyBlockedTasks();
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Jun 23 17:10:48 2011
@@ -36,7 +36,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.dht.Token;
@@ -189,16 +189,21 @@ public class NodeCmd
 
     public void printThreadPoolStats(PrintStream outs)
     {
-        outs.printf("%-25s%10s%10s%15s%n", "Pool Name", "Active", "Pending", "Completed");
+        outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked");
 
-        Iterator<Map.Entry<String, IExecutorMBean>> threads = probe.getThreadPoolMBeanProxies();
+        Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>> threads = probe.getThreadPoolMBeanProxies();
         while (threads.hasNext())
         {
-            Entry<String, IExecutorMBean> thread = threads.next();
+            Entry<String, JMXEnabledThreadPoolExecutorMBean> thread = threads.next();
             String poolName = thread.getKey();
-            IExecutorMBean threadPoolProxy = thread.getValue();
-            outs.printf("%-25s%10s%10s%15s%n",
-                        poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCompletedTasks());
+            JMXEnabledThreadPoolExecutorMBean threadPoolProxy = thread.getValue();
+            outs.printf("%-25s%10s%10s%15s%10s%18s%n",
+                        poolName,
+                        threadPoolProxy.getActiveCount(),
+                        threadPoolProxy.getPendingTasks(),
+                        threadPoolProxy.getCompletedTasks(),
+                        threadPoolProxy.getCurrentlyBlockedTasks(),
+                        threadPoolProxy.getTotalBlockedTasks());
         }
     }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Jun 23 17:10:48 2011
@@ -40,7 +40,7 @@ import javax.management.remote.JMXServic
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.CompactionManager;
@@ -372,7 +372,7 @@ public class NodeProbe
         ssProxy.forceRemoveCompletion();
     }
   
-    public Iterator<Map.Entry<String, IExecutorMBean>> getThreadPoolMBeanProxies()
+    public Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>> getThreadPoolMBeanProxies()
     {
         try
         {
@@ -587,7 +587,7 @@ class ColumnFamilyStoreMBeanIterator imp
     }
 }
 
-class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, IExecutorMBean>>
+class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>>
 {
     private Iterator<ObjectName> resIter;
     private MBeanServerConnection mbeanServerConn;
@@ -606,12 +606,12 @@ class ThreadPoolProxyMBeanIterator imple
         return resIter.hasNext();
     }
 
-    public Map.Entry<String, IExecutorMBean> next()
+    public Map.Entry<String, JMXEnabledThreadPoolExecutorMBean> next()
     {
         ObjectName objectName = resIter.next();
         String poolName = objectName.getKeyProperty("type");
-        IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, IExecutorMBean.class);
-        return new AbstractMap.SimpleImmutableEntry<String, IExecutorMBean>(poolName, threadPoolProxy);
+        JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, JMXEnabledThreadPoolExecutorMBean.class);
+        return new AbstractMap.SimpleImmutableEntry<String, JMXEnabledThreadPoolExecutorMBean>(poolName, threadPoolProxy);
     }
 
     public void remove()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java?rev=1138996&r1=1138995&r2=1138996&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java Thu Jun 23 17:10:48 2011
@@ -33,7 +33,7 @@ import com.google.common.collect.Iterabl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
@@ -48,7 +48,7 @@ public class StatusLogger
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         
         // everything from o.a.c.concurrent
-        logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", "Pending"));
+        logger.info(String.format("%-25s%10s%10s%10s", "Pool Name", "Active", "Pending", "Blocked"));
         Set<ObjectName> request, internal;
         try
         {
@@ -62,9 +62,9 @@ public class StatusLogger
         for (ObjectName objectName : Iterables.concat(request, internal))
         {
             String poolName = objectName.getKeyProperty("type");
-            IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, IExecutorMBean.class);
-            logger.info(String.format("%-25s%10s%10s",
-                                      poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks()));
+            JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, JMXEnabledThreadPoolExecutorMBean.class);
+            logger.info(String.format("%-25s%10s%10s%10s",
+                                      poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCurrentlyBlockedTasks()));
         }
         // one offs
         logger.info(String.format("%-25s%10s%10s",