You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/11 21:00:00 UTC

svn commit: r898035 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/net/ src/java/org/apache...

Author: jbellis
Date: Mon Jan 11 19:59:58 2010
New Revision: 898035

URL: http://svn.apache.org/viewvc?rev=898035&view=rev
Log:
centralize stage creation in StageManager and standardize stage naming
patch by jbellis; reviewed by gdusbabek for CASSANDRA-684

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Mon Jan 11 19:59:58 2010
@@ -21,55 +21,39 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.net.MessagingService;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
 import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
 
 
 /**
- * This class manages all stages that exist within a process. The application registers
- * and de-registers stages with this abstraction. Any component that has the <i>ID</i> 
- * associated with a stage can obtain a handle to actual stage.
+ * This class manages executor services for Messages recieved: each Message requests
+ * running on a specific "stage" for concurrency control; hence the Map approach,
+ * even though stages (executors) are not created dynamically.
  */
-
 public class StageManager
 {
-    private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+    private static Map<String, IStage> stageQueues = new HashMap<String, IStage>();
 
-    public final static String readStage_ = "ROW-READ-STAGE";
-    public final static String mutationStage_ = "ROW-MUTATION-STAGE";
-    public final static String streamStage_ = "STREAM-STAGE";
+    public final static String READ_STAGE = "ROW-READ-STAGE";
+    public final static String MUTATION_STAGE = "ROW-MUTATION-STAGE";
+    public final static String STREAM_STAGE = "STREAM-STAGE";
+    public final static String GOSSIP_STAGE = "GS";
+    public static final String RESPONSE_STAGE = "RESPONSE-STAGE";
+    public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
+    private static final String LOADBALANCE_STAGE = "LOAD-BALANCER-STAGE";
 
     static
     {
-        StageManager.registerStage(mutationStage_, new MultiThreadedStage(mutationStage_, getConcurrentWriters()));
-        StageManager.registerStage(readStage_, new MultiThreadedStage(readStage_, getConcurrentReaders()));
-        StageManager.registerStage(streamStage_, new SingleThreadedStage(streamStage_));
-    }
-    
-    /**
-     * Register a stage with the StageManager
-     * @param stageName stage name.
-     * @param stage stage for the respective message types.
-     */
-    public static void registerStage(String stageName, IStage stage)
-    {
-        stageQueues_.put(stageName, stage);
-    }
-    
-    /**
-     * Returns the stage that we are currently executing on.
-     * This relies on the fact that the thread names in the
-     * stage have the name of the stage as the prefix.
-     * @return Returns the stage that we are currently executing on.
-     */
-    public static IStage getCurrentStage()
-    {
-        String name = Thread.currentThread().getName();
-        String[] peices = name.split(":");
-        IStage stage = getStage(peices[0]);
-        return stage;
+        stageQueues.put(MUTATION_STAGE, new MultiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
+        stageQueues.put(READ_STAGE, new MultiThreadedStage(READ_STAGE, getConcurrentReaders()));
+        stageQueues.put(STREAM_STAGE, new SingleThreadedStage(STREAM_STAGE));
+        stageQueues.put(GOSSIP_STAGE, new SingleThreadedStage("GMFD"));
+        stageQueues.put(RESPONSE_STAGE, new MultiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
+        stageQueues.put(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
+        stageQueues.put(LOADBALANCE_STAGE, new SingleThreadedStage(LOADBALANCE_STAGE));
     }
 
     /**
@@ -78,51 +62,18 @@
     */
     public static IStage getStage(String stageName)
     {
-        return stageQueues_.get(stageName);
+        return stageQueues.get(stageName);
     }
     
     /**
-     * Retrieve the internal thread pool associated with the
-     * specified stage name.
-     * @param stageName name of the stage.
-     */
-    public static ExecutorService getStageInternalThreadPool(String stageName)
-    {
-        IStage stage = getStage(stageName);
-        if ( stage == null )
-            throw new IllegalArgumentException("No stage registered with name " + stageName);
-        return stage.getInternalThreadPool();
-    }
-
-    /**
-     * Deregister a stage from StageManager
-     * @param stageName stage name.
-     */
-    public static void deregisterStage(String stageName)
-    {
-        stageQueues_.remove(stageName);
-    }
-
-    /**
-     * This method gets the number of tasks on the
-     * stage's internal queue.
-     * @param stage name of the stage
-     * @return stage task count.
-     */
-    public static long getStageTaskCount(String stage)
-    {
-        return stageQueues_.get(stage).getPendingTasks();
-    }
-
-    /**
      * This method shuts down all registered stages.
      */
     public static void shutdown()
     {
-        Set<String> stages = stageQueues_.keySet();
+        Set<String> stages = stageQueues.keySet();
         for ( String stage : stages )
         {
-            IStage registeredStage = stageQueues_.get(stage);
+            IStage registeredStage = stageQueues.get(stage);
             registeredStage.shutdown();
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jan 11 19:59:58 2010
@@ -280,7 +280,7 @@
     void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
-        assert StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() == 0;
+        assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() == 0;
         int rows = 0;
         for (File file : clogs)
         {
@@ -356,14 +356,14 @@
                         }
                     }
                 };
-                StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
                 rows++;
             }
             reader.close();
         }
 
         // wait for all the writes to finish on the mutation stage
-        while (StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() < rows)
+        while (StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() < rows)
         {
             try
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Mon Jan 11 19:59:58 2010
@@ -55,7 +55,7 @@
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
         return new Message(FBUtilities.getLocalAddress(),
-                           StageManager.readStage_,
+                           StageManager.READ_STAGE,
                            StorageService.rangeVerbHandler_,
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon Jan 11 19:59:58 2010
@@ -100,7 +100,7 @@
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
         return new Message(FBUtilities.getLocalAddress(),
-                           StageManager.readStage_,
+                           StageManager.READ_STAGE,
                            StorageService.rangeSliceVerbHandler_,
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Jan 11 19:59:58 2010
@@ -54,7 +54,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StageManager.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.READ_STAGE, StorageService.readVerbHandler_, bos.toByteArray());
     }
 
     public final QueryPath queryPath;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon Jan 11 19:59:58 2010
@@ -23,6 +23,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -55,7 +56,7 @@
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(FBUtilities.getLocalAddress(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
+        Message message = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_, bos.toByteArray());
         return message;
     }
 	

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jan 11 19:59:58 2010
@@ -216,7 +216,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, verbHandlerName, bos.toByteArray());
     }
 
     public static RowMutation getRowMutationFromMutations(String keyspace, String key, Map<String, List<Mutation>> cfmap)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Mon Jan 11 19:59:58 2010
@@ -51,7 +51,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         RowMutationMessage.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, verbHandlerName, bos.toByteArray());
     }
     
     @XmlElement(name="RowMutation")

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan 11 19:59:58 2010
@@ -22,7 +22,6 @@
 import java.util.*;
 import java.net.InetAddress;
 
-import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IVerbHandler;
@@ -99,8 +98,6 @@
     }
 
     final static int MAX_GOSSIP_PACKET_SIZE = 1428;
-    /* GS - abbreviation for GOSSIPER_STAGE */
-    final static String GOSSIP_STAGE = "GS";
     /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
     final static String JOIN_VERB_HANDLER = "JVH";
     /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
@@ -153,8 +150,6 @@
         MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
         MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
         MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
-        /* register the Gossip stage */
-        StageManager.registerStage( Gossiper.GOSSIP_STAGE, new SingleThreadedStage("GMFD") );
     }
 
     /** Register with the Gossiper for EndPointState notifications */
@@ -285,7 +280,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
@@ -295,7 +290,7 @@
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
             logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
-        return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
@@ -303,7 +298,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
-        return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
     }
 
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java Mon Jan 11 19:59:58 2010
@@ -55,7 +55,7 @@
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), StageManager.streamStage_, StorageService.streamRequestVerbHandler_, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.streamRequestVerbHandler_, bos.toByteArray() );
     }        
     
     protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Jan 11 19:59:58 2010
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.net.InetAddress;
 
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
 
 public class Message
@@ -120,7 +121,7 @@
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, MessagingService.responseStage_, MessagingService.responseVerbHandler_);
+        Header header = new Header(getMessageId(), from, StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_);
         return new Message(header, args);
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 11 19:59:58 2010
@@ -51,8 +51,6 @@
     private static byte[] protocol_ = new byte[16];
     /* Verb Handler for the Response */
     public static final String responseVerbHandler_ = "RESPONSE";
-    /* Stage for responses. */
-    public static final String responseStage_ = "RESPONSE-STAGE";
 
     /* This records all the results mapped by message Id */
     private static ICachetable<String, IAsyncCallback> callbackMap_;
@@ -84,7 +82,7 @@
     
     private static volatile MessagingService messagingService_ = new MessagingService();
 
-    private static final int MESSAGE_DESERIALIZE_THREADS = 4;
+    public static final int MESSAGE_DESERIALIZE_THREADS = 4;
 
     public static int getVersion()
     {
@@ -121,34 +119,33 @@
          * before the callback is evicted from the table. The concurrency level is set at 128
          * which is the sum of the threads in the pool that adds shit into the table and the 
          * pool that retrives the callback from here.
-        */ 
-        int maxSize = MESSAGE_DESERIALIZE_THREADS;
+        */
         callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
         
-        messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor( maxSize,
-                maxSize,
+        messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor(
+                MESSAGE_DESERIALIZE_THREADS,
+                MESSAGE_DESERIALIZE_THREADS,
                 Integer.MAX_VALUE,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue<Runnable>(),
                 new NamedThreadFactory("MESSAGING-SERVICE-POOL")
-                );
+        );
 
-        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor( maxSize,
-                maxSize,
+        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(
+                MESSAGE_DESERIALIZE_THREADS,
+                MESSAGE_DESERIALIZE_THREADS,
                 Integer.MAX_VALUE,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue<Runnable>(),
                 new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")
-                ); 
+        );
         
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
                 
         protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());        
         /* register the response verb handler */
         registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
-        /* register stage for response */
-        StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
     }
     
     public byte[] hash(String type, byte data[])

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Mon Jan 11 19:59:58 2010
@@ -19,11 +19,9 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.locks.*;
 import java.net.InetAddress;
 
-import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.StageManager;
 
 class TcpConnectionManager
 {
@@ -49,7 +47,7 @@
      */
     synchronized TcpConnection getConnection(Message msg) throws IOException
     {
-        if (MessagingService.responseStage_.equals(msg.getMessageType()))
+        if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType()))
         {
             if (ackCon == null)
                 ackCon = newCon();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Jan 11 19:59:58 2010
@@ -23,7 +23,6 @@
 import java.util.*;
 import java.util.concurrent.*;
 
-import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
@@ -90,7 +89,6 @@
 {
     private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
 
-    public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
     public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
     public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
 
@@ -137,8 +135,6 @@
      */
     private AntiEntropyService()
     {
-        StageManager.registerStage(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
-
         MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
         MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
         naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
@@ -230,7 +226,7 @@
         for (Differencer differencer : differencers)
         {
             logger.info("Queueing comparison " + differencer);
-            StageManager.getStage(AE_SERVICE_STAGE).execute(differencer);
+            StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(differencer);
         }
     }
 
@@ -487,7 +483,7 @@
                 for (MerkleTree.RowHash minrow : minrows)
                     range.addHash(minrow);
 
-            StageManager.getStage(AE_SERVICE_STAGE).execute(this);
+            StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(this);
             logger.debug("Validated " + validated + " rows into AEService tree for " + cf);
         }
         
@@ -681,7 +677,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(new CFPair(table, cf), dos);
-                return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
+                return new Message(FBUtilities.getLocalAddress(), StageManager.AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {
@@ -739,7 +735,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos);
-                return new Message(local, AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
+                return new Message(local, StageManager.AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Jan 11 19:59:58 2010
@@ -177,7 +177,6 @@
 
 
     private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
-    private static final String lbStage_ = "LOAD-BALANCER-STAGE";
     private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
     /* time to delay in minutes the actual load balance procedure if heavily loaded */
     private static final int delay_ = 5;
@@ -199,7 +198,6 @@
 
     private StorageLoadBalancer()
     {
-        StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
         MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
         Gossiper.instance().register(this);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jan 11 19:59:58 2010
@@ -19,7 +19,6 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -130,7 +129,7 @@
                                         rm.apply();
                                     }
                                 };
-                                StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                                StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
                             }
                             else
                             {
@@ -270,7 +269,7 @@
                 responseHandler.localResponse();
             }
         };
-        StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+        StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
     }
 
     private static int determineBlockFor(int naturalTargets, int hintedTargets, ConsistencyLevel consistency_level)
@@ -497,7 +496,7 @@
         for (ReadCommand command: commands)
         {
             Callable<Object> callable = new weakReadLocalCallable(command);
-            futures.add(StageManager.getStage(StageManager.readStage_).execute(callable));
+            futures.add(StageManager.getStage(StageManager.READ_STAGE).execute(callable));
         }
         for (Future<Object> future : futures)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 11 19:59:58 2010
@@ -1329,7 +1329,7 @@
                     }
                 }
             };
-            StageManager.getStage(StageManager.streamStage_).execute(new Runnable()
+            StageManager.getStage(StageManager.STREAM_STAGE).execute(new Runnable()
             {
                 public void run()
                 {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=898035&r1=898034&r2=898035&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Mon Jan 11 19:59:58 2010
@@ -235,7 +235,7 @@
 
     Future<Object> flushAES()
     {
-        return StageManager.getStage(AE_SERVICE_STAGE).execute(new Callable<Object>(){
+        return StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(new Callable<Object>(){
             public Boolean call()
             {
                 return true;