You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/11 21:14:54 UTC
svn commit: r898049 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Mon Jan 11 20:14:53 2010
New Revision: 898049
URL: http://svn.apache.org/viewvc?rev=898049&view=rev
Log:
inline IStage.executorService, removing the useless Stage wrappers
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Mon Jan 11 20:14:53 2010
@@ -21,6 +21,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.cassandra.net.MessagingService;
@@ -35,7 +39,7 @@
*/
public class StageManager
{
- private static Map<String, IStage> stageQueues = new HashMap<String, IStage>();
+ private static Map<String, ThreadPoolExecutor> stages = new HashMap<String, ThreadPoolExecutor>();
public final static String READ_STAGE = "ROW-READ-STAGE";
public final static String MUTATION_STAGE = "ROW-MUTATION-STAGE";
@@ -47,22 +51,33 @@
static
{
- stageQueues.put(MUTATION_STAGE, new MultiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
- stageQueues.put(READ_STAGE, new MultiThreadedStage(READ_STAGE, getConcurrentReaders()));
- stageQueues.put(STREAM_STAGE, new SingleThreadedStage(STREAM_STAGE));
- stageQueues.put(GOSSIP_STAGE, new SingleThreadedStage("GMFD"));
- stageQueues.put(RESPONSE_STAGE, new MultiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
- stageQueues.put(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
- stageQueues.put(LOADBALANCE_STAGE, new SingleThreadedStage(LOADBALANCE_STAGE));
+ stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
+ stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, getConcurrentReaders()));
+ stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
+ // the rest are all single-threaded
+ stages.put(STREAM_STAGE, new JMXEnabledThreadPoolExecutor(STREAM_STAGE));
+ stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
+ stages.put(AE_SERVICE_STAGE, new JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
+ stages.put(LOADBALANCE_STAGE, new JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
+ }
+
+ private static ThreadPoolExecutor multiThreadedStage(String name, int numThreads)
+ {
+ return new JMXEnabledThreadPoolExecutor(numThreads,
+ numThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory(name));
}
/**
* Retrieve a stage from the StageManager
* @param stageName name of the stage to be retrieved.
*/
- public static IStage getStage(String stageName)
+ public static ThreadPoolExecutor getStage(String stageName)
{
- return stageQueues.get(stageName);
+ return stages.get(stageName);
}
/**
@@ -70,11 +85,10 @@
*/
public static void shutdown()
{
- Set<String> stages = stageQueues.keySet();
- for ( String stage : stages )
+ Set<String> stages = StageManager.stages.keySet();
+ for (String stage : stages)
{
- IStage registeredStage = stageQueues.get(stage);
- registeredStage.shutdown();
+ StageManager.stages.get(stage).shutdown();
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jan 11 20:14:53 2010
@@ -280,7 +280,7 @@
void recover(File[] clogs) throws IOException
{
Set<Table> tablesRecovered = new HashSet<Table>();
- assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() == 0;
+ assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount() == 0;
int rows = 0;
for (File file : clogs)
{
@@ -363,7 +363,7 @@
}
// wait for all the writes to finish on the mutation stage
- while (StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTasks() < rows)
+ while (StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount() < rows)
{
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 11 20:14:53 2010
@@ -493,7 +493,7 @@
private static void enqueueRunnable(String stageName, Runnable runnable){
- IStage stage = StageManager.getStage(stageName);
+ ExecutorService stage = StageManager.getStage(stageName);
if ( stage != null )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Jan 11 20:14:53 2010
@@ -483,7 +483,7 @@
for (MerkleTree.RowHash minrow : minrows)
range.addHash(minrow);
- StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(this);
+ StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
logger.debug("Validated " + validated + " rows into AEService tree for " + cf);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Jan 11 20:14:53 2010
@@ -25,8 +25,6 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jan 11 20:14:53 2010
@@ -496,7 +496,7 @@
for (ReadCommand command: commands)
{
Callable<Object> callable = new weakReadLocalCallable(command);
- futures.add(StageManager.getStage(StageManager.READ_STAGE).execute(callable));
+ futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
}
for (Future<Object> future : futures)
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=898049&r1=898048&r2=898049&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Mon Jan 11 20:14:53 2010
@@ -235,11 +235,12 @@
Future<Object> flushAES()
{
- return StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(new Callable<Object>(){
+ return StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(new Callable<Object>()
+ {
public Boolean call()
{
return true;
}
- });
+ });
}
}