You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [2/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/j...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jul 30 15:30:21 2009
@@ -1,128 +1,128 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-import java.lang.management.ManagementFactory;
-
-import org.apache.log4j.Logger;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
- * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected 
- * Runtime Exceptions.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
-{
-    private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
-
-    private ObjectName objName;
-    public DebuggableThreadPoolExecutor(String threadPoolName) 
-    {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(threadPoolName));
-    }
-
-    public DebuggableThreadPoolExecutor(int corePoolSize,
-            int maximumPoolSize,
-            long keepAliveTime,
-            TimeUnit unit,
-            BlockingQueue<Runnable> workQueue,
-            ThreadFactoryImpl threadFactory)
-    {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
-        super.prestartAllCoreThreads();
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id_);
-            mbs.registerMBean(this, objName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public void unregisterMBean()
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.unregisterMBean(objName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public long getPendingTasks()
-    {
-        return getTaskCount() - getCompletedTaskCount();
-    }
-
-    /*
-     * 
-     *  (non-Javadoc)
-     * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
-     * Helps us in figuring out why sometimes the threads are getting 
-     * killed and replaced by new ones.
-     */
-    public void afterExecute(Runnable r, Throwable t)
-    {
-        super.afterExecute(r,t);
-
-        if (r instanceof FutureTask) {
-            assert t == null;
-            try
-            {
-                ((FutureTask)r).get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (ExecutionException e)
-            {
-                t = e;
-            }
-        }
-
-        if ( t != null )
-        {  
-            Context ctx = ThreadLocalContext.get();
-            if ( ctx != null )
-            {
-                Object object = ctx.get(r.getClass().getName());
-                
-                if ( object != null )
-                {
-                    logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
-                }
-            }
-            logger_.error("Error in ThreadPoolExecutor", t);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.lang.management.ManagementFactory;
+
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected 
+ * Runtime Exceptions.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
+{
+    private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
+
+    private ObjectName objName;
+    public DebuggableThreadPoolExecutor(String threadPoolName) 
+    {
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(threadPoolName));
+    }
+
+    public DebuggableThreadPoolExecutor(int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            ThreadFactoryImpl threadFactory)
+    {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super.prestartAllCoreThreads();
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id_);
+            mbs.registerMBean(this, objName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public void unregisterMBean()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.unregisterMBean(objName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public long getPendingTasks()
+    {
+        return getTaskCount() - getCompletedTaskCount();
+    }
+
+    /*
+     * 
+     *  (non-Javadoc)
+     * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
+     * Helps us in figuring out why sometimes the threads are getting 
+     * killed and replaced by new ones.
+     */
+    public void afterExecute(Runnable r, Throwable t)
+    {
+        super.afterExecute(r,t);
+
+        if (r instanceof FutureTask) {
+            assert t == null;
+            try
+            {
+                ((FutureTask)r).get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                t = e;
+            }
+        }
+
+        if ( t != null )
+        {  
+            Context ctx = ThreadLocalContext.get();
+            if ( ctx != null )
+            {
+                Object object = ctx.get(r.getClass().getName());
+                
+                if ( object != null )
+                {
+                    logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
+                }
+            }
+            logger_.error("Error in ThreadPoolExecutor", t);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java Thu Jul 30 15:30:21 2009
@@ -1,26 +1,26 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import org.apache.commons.javaflow.Continuation;
-
-public interface IContinuable
-{
-    public void run(Continuation c);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.commons.javaflow.Continuation;
+
+public interface IContinuable
+{
+    public void run(Continuation c);
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java Thu Jul 30 15:30:21 2009
@@ -1,120 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-/**
- * An abstraction for stages as described in the SEDA paper by Matt Welsh. 
- * For reference to the paper look over here 
- * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
-   Scalable Internet Services</a>.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IStage 
-{
-    /**
-     * Get the name of the associated stage.
-     * @return name of the associated stage.
-     */
-    public String getName();
-    
-    /**
-     * Get the thread pool used by this stage 
-     * internally.
-     */
-    public ExecutorService getInternalThreadPool();
-    
-    /**
-     * This method is used to execute a piece of code on
-     * this stage. The idea is that the <i>run()</i> method
-     * of this Runnable instance is invoked on a thread from a
-     * thread pool that belongs to this stage.
-     * @param runnable instance whose run() method needs to be invoked.
-     */
-    public void execute(Runnable runnable);
-    
-    /**
-     * This method is used to execute a piece of code on
-     * this stage which returns a Future pointer. The idea
-     * is that the <i>call()</i> method of this Runnable 
-     * instance is invoked on a thread from a thread pool 
-     * that belongs to this stage.
-     
-     * @param callable instance that needs to be invoked.
-     * @return the future return object from the callable.
-     */
-    public Future<Object> execute(Callable<Object> callable);
-    
-    /**
-     * This method is used to submit tasks to this stage
-     * that execute periodically. 
-     * 
-     * @param command the task to execute.
-     * @param delay the time to delay first execution 
-     * @param unit the time unit of the initialDelay and period parameters 
-     * @return the future return object from the runnable.
-     */
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 
-      
-    /**
-     * This method is used to submit tasks to this stage
-     * that execute periodically. 
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution
-     * @param period the period between successive executions
-     * @param unit the time unit of the initialDelay and period parameters 
-     * @return the future return object from the runnable.
-     */
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 
-    
-    /**
-     * This method is used to submit tasks to this stage
-     * that execute periodically. 
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution
-     * @param delay  the delay between the termination of one execution and the commencement of the next.
-     * @param unit the time unit of the initialDelay and delay parameters 
-     * @return the future return object from the runnable.
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
-    
-    /**
-     * Shutdown the stage. All the threads of this stage
-     * are forcefully shutdown. Any pending tasks on this
-     * stage could be dropped or the stage could wait for 
-     * these tasks to be completed. This is however an 
-     * implementation detail.
-     */
-    public void shutdown();  
-    
-    /**
-     * Checks if the stage has been shutdown.
-     * @return true if shut down, otherwise false.
-     */
-    public boolean isShutdown();
-    
-    /**
-     * This method returns the number of tasks that are 
-     * pending on this stage to be executed.
-     * @return task count.
-     */
-    public long getPendingTasks();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * An abstraction for stages as described in the SEDA paper by Matt Welsh. 
+ * For reference to the paper look over here 
+ * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
+   Scalable Internet Services</a>.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStage 
+{
+    /**
+     * Get the name of the associated stage.
+     * @return name of the associated stage.
+     */
+    public String getName();
+    
+    /**
+     * Get the thread pool used by this stage 
+     * internally.
+     */
+    public ExecutorService getInternalThreadPool();
+    
+    /**
+     * This method is used to execute a piece of code on
+     * this stage. The idea is that the <i>run()</i> method
+     * of this Runnable instance is invoked on a thread from a
+     * thread pool that belongs to this stage.
+     * @param runnable instance whose run() method needs to be invoked.
+     */
+    public void execute(Runnable runnable);
+    
+    /**
+     * This method is used to execute a piece of code on
+     * this stage which returns a Future pointer. The idea
+     * is that the <i>call()</i> method of this Runnable 
+     * instance is invoked on a thread from a thread pool 
+     * that belongs to this stage.
+     
+     * @param callable instance that needs to be invoked.
+     * @return the future return object from the callable.
+     */
+    public Future<Object> execute(Callable<Object> callable);
+    
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * 
+     * @param command the task to execute.
+     * @param delay the time to delay first execution 
+     * @param unit the time unit of the initialDelay and period parameters 
+     * @return the future return object from the runnable.
+     */
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 
+      
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * @param command the task to execute.
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and period parameters 
+     * @return the future return object from the runnable.
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 
+    
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * @param command the task to execute.
+     * @param initialDelay the time to delay first execution
+     * @param delay  the delay between the termination of one execution and the commencement of the next.
+     * @param unit the time unit of the initialDelay and delay parameters 
+     * @return the future return object from the runnable.
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
+    
+    /**
+     * Shutdown the stage. All the threads of this stage
+     * are forcefully shutdown. Any pending tasks on this
+     * stage could be dropped or the stage could wait for 
+     * these tasks to be completed. This is however an 
+     * implementation detail.
+     */
+    public void shutdown();  
+    
+    /**
+     * Checks if the stage has been shutdown.
+     * @return true if shut down, otherwise false.
+     */
+    public boolean isShutdown();
+    
+    /**
+     * This method returns the number of tasks that are 
+     * pending on this stage to be executed.
+     * @return task count.
+     */
+    public long getPendingTasks();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java Thu Jul 30 15:30:21 2009
@@ -1,98 +1,98 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-import javax.naming.OperationNotSupportedException;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * This class is an implementation of the <i>IStage</i> interface. In particular
- * it is for a stage that has a thread pool with multiple threads. For details 
- * please refer to the <i>IStage</i> documentation.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MultiThreadedStage implements IStage 
-{    
-    private String name_;
-    private DebuggableThreadPoolExecutor executorService_;
-            
-    public MultiThreadedStage(String name, int numThreads)
-    {        
-        name_ = name;        
-        executorService_ = new DebuggableThreadPoolExecutor( numThreads,
-                numThreads,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new ThreadFactoryImpl(name)
-                );        
-    }
-    
-    public String getName() 
-    {        
-        return name_;
-    }    
-    
-    public ExecutorService getInternalThreadPool()
-    {
-        return executorService_;
-    }
-
-    public Future<Object> execute(Callable<Object> callable) {
-        return executorService_.submit(callable);
-    }
-    
-    public void execute(Runnable runnable) {
-        executorService_.execute(runnable);
-    }
-    
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-    {
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public void shutdown() {  
-        executorService_.shutdownNow(); 
-    }
-    
-    public boolean isShutdown()
-    {
-        return executorService_.isShutdown();
-    }
-    
-    public long getPendingTasks(){
-        return executorService_.getPendingTasks();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with multiple threads. For details 
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiThreadedStage implements IStage 
+{    
+    private String name_;
+    private DebuggableThreadPoolExecutor executorService_;
+            
+    public MultiThreadedStage(String name, int numThreads)
+    {        
+        name_ = name;        
+        executorService_ = new DebuggableThreadPoolExecutor( numThreads,
+                numThreads,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+    }
+    
+    public String getName() 
+    {        
+        return name_;
+    }    
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+
+    public Future<Object> execute(Callable<Object> callable) {
+        return executorService_.submit(callable);
+    }
+    
+    public void execute(Runnable runnable) {
+        executorService_.execute(runnable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown() {  
+        executorService_.shutdownNow(); 
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }
+    
+    public long getPendingTasks(){
+        return executorService_.getPendingTasks();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java Thu Jul 30 15:30:21 2009
@@ -1,24 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-interface RejectedExecutionHandler
-{
-    public void rejectedExecution(Runnable r, ContinuationsExecutor executor); 
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+interface RejectedExecutionHandler
+{
+    public void rejectedExecution(Runnable r, ContinuationsExecutor executor); 
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java Thu Jul 30 15:30:21 2009
@@ -1,100 +1,100 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-public class SingleThreadedContinuationStage implements IStage 
-{
-    protected ContinuationsExecutor executorService_;
-    private String name_;
-
-    public SingleThreadedContinuationStage(String name)
-    {        
-        executorService_ = new ContinuationsExecutor( 1,
-                1,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new ThreadFactoryImpl(name)
-                );        
-        name_ = name;        
-    }
-    
-    /* Implementing the IStage interface methods */
-    
-    public String getName()
-    {
-        return name_;
-    }
-    
-    public ExecutorService getInternalThreadPool()
-    {
-        return executorService_;
-    }
-    
-    public void execute(Runnable runnable)
-    {
-        executorService_.execute(runnable);
-    }
-    
-    public Future<Object> execute(Callable<Object> callable)
-    {
-        return executorService_.submit(callable);
-    }
-    
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-    {
-        //return executorService_.schedule(command, delay, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
-    {
-        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
-    {
-        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public void shutdown()
-    {
-        executorService_.shutdownNow();
-    }
-    
-    public boolean isShutdown()
-    {
-        return executorService_.isShutdown();
-    }    
-    
-    public long getPendingTasks(){
-        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
-    }
-    /* Finished implementing the IStage interface methods */
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SingleThreadedContinuationStage implements IStage 
+{
+    protected ContinuationsExecutor executorService_;
+    private String name_;
+
+    public SingleThreadedContinuationStage(String name)
+    {        
+        executorService_ = new ContinuationsExecutor( 1,
+                1,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+        name_ = name;        
+    }
+    
+    /* Implementing the IStage interface methods */
+    
+    public String getName()
+    {
+        return name_;
+    }
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+    
+    public void execute(Runnable runnable)
+    {
+        executorService_.execute(runnable);
+    }
+    
+    public Future<Object> execute(Callable<Object> callable)
+    {
+        return executorService_.submit(callable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        //return executorService_.schedule(command, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown()
+    {
+        executorService_.shutdownNow();
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }    
+    
+    public long getPendingTasks(){
+        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+    }
+    /* Finished implementing the IStage interface methods */
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java Thu Jul 30 15:30:21 2009
@@ -1,100 +1,100 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.net.*;
-
-/**
- * This class is an implementation of the <i>IStage</i> interface. In particular
- * it is for a stage that has a thread pool with a single thread. For details 
- * please refer to the <i>IStage</i> documentation.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class SingleThreadedStage implements IStage 
-{
-    protected DebuggableThreadPoolExecutor executorService_;
-    private String name_;
-
-	public SingleThreadedStage(String name)
-    {
-        executorService_ = new DebuggableThreadPoolExecutor(name);
-        name_ = name;
-	}
-	
-    /* Implementing the IStage interface methods */
-    
-    public String getName()
-    {
-        return name_;
-    }
-    
-    public ExecutorService getInternalThreadPool()
-    {
-        return executorService_;
-    }
-    
-    public void execute(Runnable runnable)
-    {
-        executorService_.execute(runnable);
-    }
-    
-    public Future<Object> execute(Callable<Object> callable)
-    {
-        return executorService_.submit(callable);
-    }
-    
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-    {
-        //return executorService_.schedule(command, delay, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
-    {
-        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
-    {
-        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
-        throw new UnsupportedOperationException("This operation is not supported");
-    }
-    
-    public void shutdown()
-    {
-        executorService_.shutdownNow();
-    }
-    
-    public boolean isShutdown()
-    {
-        return executorService_.isShutdown();
-    }    
-    
-    public long getPendingTasks(){
-        return executorService_.getPendingTasks();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.net.*;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with a single thread. For details 
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SingleThreadedStage implements IStage 
+{
+    protected DebuggableThreadPoolExecutor executorService_;
+    private String name_;
+
+	public SingleThreadedStage(String name)
+    {
+        executorService_ = new DebuggableThreadPoolExecutor(name);
+        name_ = name;
+	}
+	
+    /* Implementing the IStage interface methods */
+    
+    public String getName()
+    {
+        return name_;
+    }
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+    
+    public void execute(Runnable runnable)
+    {
+        executorService_.execute(runnable);
+    }
+    
+    public Future<Object> execute(Callable<Object> callable)
+    {
+        return executorService_.submit(callable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        //return executorService_.schedule(command, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown()
+    {
+        executorService_.shutdownNow();
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }    
+    
+    public long getPendingTasks(){
+        return executorService_.getPendingTasks();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Thu Jul 30 15:30:21 2009
@@ -1,117 +1,117 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-
-/**
- * This class manages all stages that exist within a process. The application registers
- * and de-registers stages with this abstraction. Any component that has the <i>ID</i> 
- * associated with a stage can obtain a handle to actual stage.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class StageManager
-{
-    private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
-    
-    /**
-     * Register a stage with the StageManager
-     * @param stageName stage name.
-     * @param stage stage for the respective message types.
-     */
-    public static void registerStage(String stageName, IStage stage)
-    {
-        stageQueues_.put(stageName, stage);
-    }
-    
-    /**
-     * Returns the stage that we are currently executing on.
-     * This relies on the fact that the thread names in the
-     * stage have the name of the stage as the prefix.
-     * @return Returns the stage that we are currently executing on.
-     */
-    public static IStage getCurrentStage()
-    {
-        String name = Thread.currentThread().getName();
-        String[] peices = name.split(":");
-        IStage stage = getStage(peices[0]);
-        return stage;
-    }
-
-    /**
-     * Retrieve a stage from the StageManager
-     * @param stageName name of the stage to be retrieved.
-    */
-    public static IStage getStage(String stageName)
-    {
-        return stageQueues_.get(stageName);
-    }
-    
-    /**
-     * Retrieve the internal thread pool associated with the
-     * specified stage name.
-     * @param stageName name of the stage.
-     */
-    public static ExecutorService getStageInternalThreadPool(String stageName)
-    {
-        IStage stage = getStage(stageName);
-        if ( stage == null )
-            throw new IllegalArgumentException("No stage registered with name " + stageName);
-        return stage.getInternalThreadPool();
-    }
-
-    /**
-     * Deregister a stage from StageManager
-     * @param stageName stage name.
-     */
-    public static void deregisterStage(String stageName)
-    {
-        stageQueues_.remove(stageName);
-    }
-
-    /**
-     * This method gets the number of tasks on the
-     * stage's internal queue.
-     * @param stage name of the stage
-     * @return stage task count.
-     */
-    public static long getStageTaskCount(String stage)
-    {
-        return stageQueues_.get(stage).getPendingTasks();
-    }
-
-    /**
-     * This method shuts down all registered stages.
-     */
-    public static void shutdown()
-    {
-        Set<String> stages = stageQueues_.keySet();
-        for ( String stage : stages )
-        {
-            IStage registeredStage = stageQueues_.get(stage);
-            registeredStage.shutdown();
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * This class manages all stages that exist within a process. The application registers
+ * and de-registers stages with this abstraction. Any component that has the <i>ID</i> 
+ * associated with a stage can obtain a handle to actual stage.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StageManager
+{
+    private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+    
+    /**
+     * Register a stage with the StageManager
+     * @param stageName stage name.
+     * @param stage stage for the respective message types.
+     */
+    public static void registerStage(String stageName, IStage stage)
+    {
+        stageQueues_.put(stageName, stage);
+    }
+    
+    /**
+     * Returns the stage that we are currently executing on.
+     * This relies on the fact that the thread names in the
+     * stage have the name of the stage as the prefix.
+     * @return Returns the stage that we are currently executing on.
+     */
+    public static IStage getCurrentStage()
+    {
+        String name = Thread.currentThread().getName();
+        String[] peices = name.split(":");
+        IStage stage = getStage(peices[0]);
+        return stage;
+    }
+
+    /**
+     * Retrieve a stage from the StageManager
+     * @param stageName name of the stage to be retrieved.
+    */
+    public static IStage getStage(String stageName)
+    {
+        return stageQueues_.get(stageName);
+    }
+    
+    /**
+     * Retrieve the internal thread pool associated with the
+     * specified stage name.
+     * @param stageName name of the stage.
+     */
+    public static ExecutorService getStageInternalThreadPool(String stageName)
+    {
+        IStage stage = getStage(stageName);
+        if ( stage == null )
+            throw new IllegalArgumentException("No stage registered with name " + stageName);
+        return stage.getInternalThreadPool();
+    }
+
+    /**
+     * Deregister a stage from StageManager
+     * @param stageName stage name.
+     */
+    public static void deregisterStage(String stageName)
+    {
+        stageQueues_.remove(stageName);
+    }
+
+    /**
+     * This method gets the number of tasks on the
+     * stage's internal queue.
+     * @param stage name of the stage
+     * @return stage task count.
+     */
+    public static long getStageTaskCount(String stage)
+    {
+        return stageQueues_.get(stage).getPendingTasks();
+    }
+
+    /**
+     * This method shuts down all registered stages.
+     */
+    public static void shutdown()
+    {
+        Set<String> stages = stageQueues_.keySet();
+        for ( String stage : stages )
+        {
+            IStage registeredStage = stageQueues_.get(stage);
+            registeredStage.shutdown();
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java Thu Jul 30 15:30:21 2009
@@ -1,51 +1,51 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * This class is an implementation of the <i>ThreadFactory</i> interface. This 
- * is useful to give Java threads meaningful names which is useful when using 
- * a tool like JConsole.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ThreadFactoryImpl implements ThreadFactory
-{
-    protected String id_;
-    protected ThreadGroup threadGroup_;
-    protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
-    
-    public ThreadFactoryImpl(String id)
-    {
-        SecurityManager sm = System.getSecurityManager();
-        threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
-        id_ = id;
-    }    
-    
-    public Thread newThread(Runnable runnable)
-    {        
-        String name = id_ + ":" + threadNbr_.getAndIncrement();       
-        Thread thread = new Thread(threadGroup_, runnable, name);        
-        return thread;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This class is an implementation of the <i>ThreadFactory</i> interface. This 
+ * is useful to give Java threads meaningful names which is useful when using 
+ * a tool like JConsole.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ThreadFactoryImpl implements ThreadFactory
+{
+    protected String id_;
+    protected ThreadGroup threadGroup_;
+    protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
+    
+    public ThreadFactoryImpl(String id)
+    {
+        SecurityManager sm = System.getSecurityManager();
+        threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+        id_ = id;
+    }    
+    
+    public Thread newThread(Runnable runnable)
+    {        
+        String name = id_ + ":" + threadNbr_.getAndIncrement();       
+        Thread thread = new Thread(threadGroup_, runnable, name);        
+        return thread;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java Thu Jul 30 15:30:21 2009
@@ -1,42 +1,42 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-/**
- * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when 
- * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-
-public class ThreadLocalContext
-{
-    private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
-
-    public static void put(Context value)
-    {
-        tls_.set(value);
-    }
-
-    public static Context get()
-    {
-        return tls_.get();
-    }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+/**
+ * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when 
+ * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+public class ThreadLocalContext
+{
+    private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
+
+    public static void put(Context value)
+    {
+        tls_.set(value);
+    }
+
+    public static Context get()
+    {
+        return tls_.get();
+    }
 }
\ No newline at end of file