You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/11 21:14:54 UTC

svn commit: r898049 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Mon Jan 11 20:14:53 2010
New Revision: 898049

URL: http://svn.apache.org/viewvc?rev=898049&view=rev
Log:
inline IStage.executorService, removing the useless Stage wrappers

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Mon Jan 11 20:14:53 2010
@@ -21,6 +21,10 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.net.MessagingService;
 
@@ -35,7 +39,7 @@
  */
 public class StageManager
 {
-    private static Map<String, IStage> stageQueues = new HashMap<String, IStage>();
+    private static Map<String, ThreadPoolExecutor> stages = new HashMap<String, ThreadPoolExecutor>();
 
     public final static String READ_STAGE = "ROW-READ-STAGE";
     public final static String MUTATION_STAGE = "ROW-MUTATION-STAGE";
@@ -47,22 +51,33 @@
 
     static
     {
-        stageQueues.put(MUTATION_STAGE, new MultiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
-        stageQueues.put(READ_STAGE, new MultiThreadedStage(READ_STAGE, getConcurrentReaders()));
-        stageQueues.put(STREAM_STAGE, new SingleThreadedStage(STREAM_STAGE));
-        stageQueues.put(GOSSIP_STAGE, new SingleThreadedStage("GMFD"));
-        stageQueues.put(RESPONSE_STAGE, new MultiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
-        stageQueues.put(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
-        stageQueues.put(LOADBALANCE_STAGE, new SingleThreadedStage(LOADBALANCE_STAGE));
+        stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
+        stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, getConcurrentReaders()));
+        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
+        // the rest are all single-threaded
+        stages.put(STREAM_STAGE, new JMXEnabledThreadPoolExecutor(STREAM_STAGE));
+        stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
+        stages.put(AE_SERVICE_STAGE, new JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
+        stages.put(LOADBALANCE_STAGE, new JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
+    }
+
+    private static ThreadPoolExecutor multiThreadedStage(String name, int numThreads)
+    {
+        return new JMXEnabledThreadPoolExecutor(numThreads,
+                                                numThreads,
+                                                Integer.MAX_VALUE,
+                                                TimeUnit.SECONDS,
+                                                new LinkedBlockingQueue<Runnable>(),
+                                                new NamedThreadFactory(name));
     }
 
     /**
      * Retrieve a stage from the StageManager
      * @param stageName name of the stage to be retrieved.
     */
-    public static IStage getStage(String stageName)
+    public static ThreadPoolExecutor getStage(String stageName)
     {
-        return stageQueues.get(stageName);
+        return stages.get(stageName);
     }
     
     /**
@@ -70,11 +85,10 @@
      */
     public static void shutdown()
     {
-        Set<String> stages = stageQueues.keySet();
-        for ( String stage : stages )
+        Set<String> stages = StageManager.stages.keySet();
+        for (String stage : stages)
         {
-            IStage registeredStage = stageQueues.get(stage);
-            registeredStage.shutdown();
+            StageManager.stages.get(stage).shutdown();
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jan 11 20:14:53 2010
@@ -280,7 +280,7 @@
     void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
-        assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() == 0;
+        assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount() == 0;
         int rows = 0;
         for (File file : clogs)
         {
@@ -363,7 +363,7 @@
         }
 
         // wait for all the writes to finish on the mutation stage
-        while (StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() < rows)
+        while (StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount() < rows)
         {
             try
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 11 20:14:53 2010
@@ -493,7 +493,7 @@
 
     private static void enqueueRunnable(String stageName, Runnable runnable){
         
-        IStage stage = StageManager.getStage(stageName);   
+        ExecutorService stage = StageManager.getStage(stageName);
         
         if ( stage != null )
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Jan 11 20:14:53 2010
@@ -483,7 +483,7 @@
                 for (MerkleTree.RowHash minrow : minrows)
                     range.addHash(minrow);
 
-            StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(this);
+            StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
             logger.debug("Validated " + validated + " rows into AEService tree for " + cf);
         }
         

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Jan 11 20:14:53 2010
@@ -25,8 +25,6 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jan 11 20:14:53 2010
@@ -496,7 +496,7 @@
         for (ReadCommand command: commands)
         {
             Callable<Object> callable = new weakReadLocalCallable(command);
-            futures.add(StageManager.getStage(StageManager.READ_STAGE).execute(callable));
+            futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
         }
         for (Future<Object> future : futures)
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Mon Jan 11 20:14:53 2010
@@ -235,11 +235,12 @@
 
     Future<Object> flushAES()
     {
-        return StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(new Callable<Object>(){
+        return StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(new Callable<Object>()
+        {
             public Boolean call()
             {
                 return true;
             }
-            });
+        });
     }
 }