You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/23 19:12:37 UTC
svn commit: r1138998 - in /cassandra/branches/cassandra-0.8: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/tools/ src/java/org/apache/cassandra/utils/
Author: slebresne
Date: Thu Jun 23 17:12:36 2011
New Revision: 1138998
URL: http://svn.apache.org/viewvc?rev=1138998&view=rev
Log:
Merge from 0.7
Modified:
cassandra/branches/cassandra-0.8/ (props changed)
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/contrib/ (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/StatusLogger.java
Propchange: cassandra/branches/cassandra-0.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1138710
+/cassandra/branches/cassandra-0.7:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/branches/cassandra-0.8:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0:1125021-1130369
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Jun 23 17:12:36 2011
@@ -76,6 +76,8 @@
* fix repair hanging if a neighbor has nothing to send (CASSANDRA-2797)
* purge tombstone even if row is in only one sstable (CASSANDRA-2801)
* Fix wrong purge of deleted cf during compaction (CASSANDRA-2786)
+ * Expose number of threads blocked on submitting memtable to flush
+ (CASSANDRA-2817)
0.8.0-final
Propchange: cassandra/branches/cassandra-0.8/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1138710
+/cassandra/branches/cassandra-0.7/contrib:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 17:12:36 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710,1138996
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jun 23 17:12:36 2011
@@ -66,15 +66,22 @@ public class DebuggableThreadPoolExecuto
{
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
{
+ ((DebuggableThreadPoolExecutor)executor).onInitialRejection(task);
BlockingQueue<Runnable> queue = executor.getQueue();
while (true)
{
if (executor.isShutdown())
+ {
+ ((DebuggableThreadPoolExecutor)executor).onFinalRejection(task);
throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
+ }
try
{
if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ {
+ ((DebuggableThreadPoolExecutor)executor).onFinalAccept(task);
break;
+ }
}
catch (InterruptedException e)
{
@@ -85,6 +92,10 @@ public class DebuggableThreadPoolExecuto
});
}
+ protected void onInitialRejection(Runnable task) {}
+ protected void onFinalAccept(Runnable task) {}
+ protected void onFinalRejection(Runnable task) {}
+
@Override
public void afterExecute(Runnable r, Throwable t)
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Thu Jun 23 17:12:36 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
import java.lang.management.ManagementFactory;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,9 @@ public class JMXEnabledThreadPoolExecuto
{
private final String mbeanName;
+ private final AtomicInteger totalBlocked = new AtomicInteger(0);
+ private final AtomicInteger currentBlocked = new AtomicInteger(0);
+
public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), "internal");
@@ -129,4 +133,34 @@ public class JMXEnabledThreadPoolExecuto
{
return getTaskCount() - getCompletedTaskCount();
}
+
+ public int getTotalBlockedTasks()
+ {
+ return totalBlocked.get();
+ }
+
+ public int getCurrentlyBlockedTasks()
+ {
+ return currentBlocked.get();
+ }
+
+ @Override
+ protected void onInitialRejection(Runnable task)
+ {
+ totalBlocked.incrementAndGet();
+ currentBlocked.incrementAndGet();
+ }
+
+ @Override
+ protected void onFinalAccept(Runnable task)
+ {
+ currentBlocked.decrementAndGet();
+ }
+
+ @Override
+ protected void onFinalRejection(Runnable task)
+ {
+ currentBlocked.decrementAndGet();
+ }
+
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java Thu Jun 23 17:12:36 2011
@@ -20,4 +20,15 @@ package org.apache.cassandra.concurrent;
public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
{
-}
\ No newline at end of file
+ /**
+ * Get the number of tasks that had blocked before being accepted (or
+ * rejected).
+ */
+ public int getTotalBlockedTasks();
+
+ /**
+ * Get the number of tasks currently blocked, waiting to be accepted by
+ * the executor (because all threads are busy and the backing queue is full).
+ */
+ public int getCurrentlyBlockedTasks();
+}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Jun 23 17:12:36 2011
@@ -38,7 +38,7 @@ import org.apache.cassandra.config.Confi
import org.apache.commons.cli.*;
import org.apache.cassandra.cache.InstrumentingCacheMBean;
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.dht.Token;
@@ -219,16 +219,21 @@ public class NodeCmd
public void printThreadPoolStats(PrintStream outs)
{
- outs.printf("%-25s%10s%10s%15s%n", "Pool Name", "Active", "Pending", "Completed");
+ outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked");
- Iterator<Map.Entry<String, IExecutorMBean>> threads = probe.getThreadPoolMBeanProxies();
+ Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>> threads = probe.getThreadPoolMBeanProxies();
while (threads.hasNext())
{
- Entry<String, IExecutorMBean> thread = threads.next();
+ Entry<String, JMXEnabledThreadPoolExecutorMBean> thread = threads.next();
String poolName = thread.getKey();
- IExecutorMBean threadPoolProxy = thread.getValue();
- outs.printf("%-25s%10s%10s%15s%n",
- poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCompletedTasks());
+ JMXEnabledThreadPoolExecutorMBean threadPoolProxy = thread.getValue();
+ outs.printf("%-25s%10s%10s%15s%10s%18s%n",
+ poolName,
+ threadPoolProxy.getActiveCount(),
+ threadPoolProxy.getPendingTasks(),
+ threadPoolProxy.getCompletedTasks(),
+ threadPoolProxy.getCurrentlyBlockedTasks(),
+ threadPoolProxy.getTotalBlockedTasks());
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Jun 23 17:12:36 2011
@@ -42,7 +42,7 @@ import javax.management.remote.JMXServic
import com.google.common.collect.Iterables;
import org.apache.cassandra.cache.InstrumentingCacheMBean;
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -376,7 +376,7 @@ public class NodeProbe
ssProxy.forceRemoveCompletion();
}
- public Iterator<Map.Entry<String, IExecutorMBean>> getThreadPoolMBeanProxies()
+ public Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>> getThreadPoolMBeanProxies()
{
try
{
@@ -627,7 +627,7 @@ class ColumnFamilyStoreMBeanIterator imp
}
}
-class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, IExecutorMBean>>
+class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>>
{
private Iterator<ObjectName> resIter;
private MBeanServerConnection mbeanServerConn;
@@ -646,12 +646,12 @@ class ThreadPoolProxyMBeanIterator imple
return resIter.hasNext();
}
- public Map.Entry<String, IExecutorMBean> next()
+ public Map.Entry<String, JMXEnabledThreadPoolExecutorMBean> next()
{
ObjectName objectName = resIter.next();
String poolName = objectName.getKeyProperty("type");
- IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, IExecutorMBean.class);
- return new AbstractMap.SimpleImmutableEntry<String, IExecutorMBean>(poolName, threadPoolProxy);
+ JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, JMXEnabledThreadPoolExecutorMBean.class);
+ return new AbstractMap.SimpleImmutableEntry<String, JMXEnabledThreadPoolExecutorMBean>(poolName, threadPoolProxy);
}
public void remove()
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/StatusLogger.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/StatusLogger.java?rev=1138998&r1=1138997&r2=1138998&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/StatusLogger.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/StatusLogger.java Thu Jun 23 17:12:36 2011
@@ -33,7 +33,7 @@ import com.google.common.collect.Iterabl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.MessagingService;
@@ -47,7 +47,7 @@ public class StatusLogger
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// everything from o.a.c.concurrent
- logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", "Pending"));
+ logger.info(String.format("%-25s%10s%10s%10s", "Pool Name", "Active", "Pending", "Blocked"));
Set<ObjectName> request, internal;
try
{
@@ -61,9 +61,9 @@ public class StatusLogger
for (ObjectName objectName : Iterables.concat(request, internal))
{
String poolName = objectName.getKeyProperty("type");
- IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, IExecutorMBean.class);
- logger.info(String.format("%-25s%10s%10s",
- poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks()));
+ JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, JMXEnabledThreadPoolExecutorMBean.class);
+ logger.info(String.format("%-25s%10s%10s%10s",
+ poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCurrentlyBlockedTasks()));
}
// one offs
logger.info(String.format("%-25s%10s%10s",