You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/06 04:27:55 UTC
svn commit: r772030 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: concurrent/ db/
net/ service/
Author: jbellis
Date: Wed May 6 02:27:54 2009
New Revision: 772030
URL: http://svn.apache.org/viewvc?rev=772030&view=rev
Log:
add pending tasks mbean to all DebuggableTPE. clean out ad-hoc queue length logging.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-135
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ContinuationStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ContinuationStage.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ContinuationStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ContinuationStage.java Wed May 6 02:27:54 2009
@@ -85,7 +85,7 @@
return executorService_.isShutdown();
}
- public long getTaskCount(){
+ public long getPendingTasks(){
return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed May 6 02:27:54 2009
@@ -19,10 +19,12 @@
package org.apache.cassandra.concurrent;
import java.util.concurrent.*;
+import java.lang.management.ManagementFactory;
-import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
/**
* This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
@@ -32,7 +34,7 @@
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
{
private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
@@ -46,12 +48,26 @@
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory)
+ ThreadFactoryImpl threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id_));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
-
+
+ public long getPendingTasks()
+ {
+ return getTaskCount() - getCompletedTaskCount();
+ }
+
/*
*
* (non-Javadoc)
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java?rev=772030&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java Wed May 6 02:27:54 2009
@@ -0,0 +1,6 @@
+package org.apache.cassandra.concurrent;
+
+public interface DebuggableThreadPoolExecutorMBean
+{
+ public long getPendingTasks();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java Wed May 6 02:27:54 2009
@@ -116,5 +116,5 @@
* pending on this stage to be executed.
* @return task count.
*/
- public long getTaskCount();
+ public long getPendingTasks();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java Wed May 6 02:27:54 2009
@@ -92,7 +92,7 @@
return executorService_.isShutdown();
}
- public long getTaskCount(){
- return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ public long getPendingTasks(){
+ return executorService_.getPendingTasks();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java Wed May 6 02:27:54 2009
@@ -92,7 +92,7 @@
return executorService_.isShutdown();
}
- public long getTaskCount(){
+ public long getPendingTasks(){
return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
}
/* Finished implementing the IStage interface methods */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java Wed May 6 02:27:54 2009
@@ -94,8 +94,7 @@
return executorService_.isShutdown();
}
- public long getTaskCount(){
- return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ public long getPendingTasks(){
+ return executorService_.getPendingTasks();
}
- /* Finished implementing the IStage interface methods */
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Wed May 6 02:27:54 2009
@@ -23,8 +23,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import org.apache.cassandra.continuations.Suspendable;
-
/**
* This class manages all stages that exist within a process. The application registers
@@ -101,7 +99,7 @@
*/
public static long getStageTaskCount(String stage)
{
- return stageQueues_.get(stage).getTaskCount();
+ return stageQueues_.get(stage).getPendingTasks();
}
/**
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 6 02:27:54 2009
@@ -1432,11 +1432,6 @@
return memtableSwitchCount;
}
- public int getMemtableTasks()
- {
- return memtable_.get().getPendingTasks();
- }
-
/**
* clears out all data associated with this ColumnFamily.
* For use in testing.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed May 6 02:27:54 2009
@@ -50,11 +50,6 @@
public int getMemtableSwitchCount();
/**
- * @return the number of tasks waiting to run on the memtable executor
- */
- public int getMemtableTasks();
-
- /**
* Triggers an immediate memtable flush.
*/
public void forceFlush();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed May 6 02:27:54 2009
@@ -45,7 +45,7 @@
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
- private boolean isFrozen_;
+ private static AtomicInteger executorCount_ = new AtomicInteger(0);
public static void shutdown()
{
@@ -56,6 +56,7 @@
}
private MemtableThreadPoolExecutor executor_;
+ private boolean isFrozen_;
private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
private int thresholdCount_ = (int)(DatabaseDescriptor.getMemtableObjectCount()*1024*1024);
@@ -72,12 +73,12 @@
Memtable(String table, String cfName)
{
- executor_ = new MemtableThreadPoolExecutor();
- runningExecutorServices_.add(executor_);
-
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
+
+ executor_ = new MemtableThreadPoolExecutor();
+ runningExecutorServices_.add(executor_);
}
class Putter implements Runnable
@@ -169,11 +170,6 @@
return cfName_;
}
- int getPendingTasks()
- {
- return (int)(executor_.getTaskCount() - executor_.getCompletedTaskCount());
- }
-
private synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
{
if (!isFrozen_)
@@ -367,7 +363,7 @@
public MemtableThreadPoolExecutor()
{
- super("FAST-MEMTABLE-POOL");
+ super("MEMTABLE-POOL-" + cfName_ + executorCount_.addAndGet(1));
}
protected void terminated()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Wed May 6 02:27:54 2009
@@ -48,23 +48,20 @@
public void run()
{
- /* For DEBUG only. Printing queue length */
- DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getDeserilizationExecutor();
- logger_.debug( "Message Deserialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
- /* END DEBUG */
+ Message message = null;
try
- {
- Message message = (Message)serializer_.deserialize(bytes_);
-
- if ( message != null )
- {
- message = SinkManager.processServerMessageSink(message);
- MessagingService.receive(message);
- }
+ {
+ message = serializer_.deserialize(bytes_);
}
- catch ( IOException ex )
- {
- logger_.warn(LogUtil.throwableToString(ex));
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if ( message != null )
+ {
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java Wed May 6 02:27:54 2009
@@ -52,11 +52,6 @@
public void run()
{
- /* For DEBUG only. Printing queue length */
- DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getWriteExecutor();
- logger_.debug( "Message Serialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
- /* END DEBUG */
-
/* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
Context ctx = new Context();
ctx.put(this.getClass().getName(), message_);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed May 6 02:27:54 2009
@@ -18,43 +18,32 @@
package org.apache.cassandra.net;
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.security.MessageDigest;
-import java.util.*;
-import java.nio.ByteBuffer;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-import java.nio.channels.*;
import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.net.io.*;
-import org.apache.cassandra.utils.*;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.xml.bind.*;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.IStage;
-import org.apache.cassandra.concurrent.MultiThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.http.HttpConnectionHandler;
import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.HashingSchemes;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.*;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.net.MulticastSocket;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.security.MessageDigest;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public class MessagingService implements IMessagingService, MessagingServiceMBean
+public class MessagingService implements IMessagingService
{
private static boolean debugOn_ = false;
@@ -243,18 +232,6 @@
return result;
}
- public long getMessagingSerializerTaskCount()
- {
- DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageSerializerExecutor_;
- return dstp.getTaskCount() - dstp.getCompletedTaskCount();
- }
-
- public long getMessagingReceiverTaskCount()
- {
- DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageDeserializationExecutor_;
- return dstp.getTaskCount() - dstp.getCompletedTaskCount();
- }
-
public void listen(EndPoint localEp, boolean isHttp) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed May 6 02:27:54 2009
@@ -174,7 +174,7 @@
byte[] data = serializer_.serialize(message);
if ( data.length > 0 )
{
- boolean listening = ( message.getFrom().equals(EndPoint.randomLocalEndPoint_) ) ? false : true;
+ boolean listening = !message.getFrom().equals(EndPoint.randomLocalEndPoint_);
ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
synchronized(this)
{
@@ -184,7 +184,6 @@
return;
}
- logger_.debug("Sending packets of size " + data.length);
socketChannel_.write(buffer);
if (buffer.remaining() > 0)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java Wed May 6 02:27:54 2009
@@ -78,7 +78,7 @@
byte[] data = bos.toByteArray();
if ( data.length > 0 )
{
- logger_.debug("Size of Gossip packet " + data.length);
+ logger_.trace("Size of Gossip packet " + data.length);
byte[] protocol = BasicUtilities.intToByteArray(protocol_);
ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
buffer.put( protocol );
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=772030&r1=772029&r2=772030&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed May 6 02:27:54 2009
@@ -263,7 +263,6 @@
*/
private void init()
{
- // Register this instance with JMX
try
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();