You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [2/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/j...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jul 30 15:30:21 2009
@@ -1,128 +1,128 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-import java.lang.management.ManagementFactory;
-
-import org.apache.log4j.Logger;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
- * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected
- * Runtime Exceptions.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
-{
- private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
-
- private ObjectName objName;
- public DebuggableThreadPoolExecutor(String threadPoolName)
- {
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(threadPoolName));
- }
-
- public DebuggableThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactoryImpl threadFactory)
- {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- super.prestartAllCoreThreads();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id_);
- mbs.registerMBean(this, objName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void unregisterMBean()
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.unregisterMBean(objName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public long getPendingTasks()
- {
- return getTaskCount() - getCompletedTaskCount();
- }
-
- /*
- *
- * (non-Javadoc)
- * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
- * Helps us in figuring out why sometimes the threads are getting
- * killed and replaced by new ones.
- */
- public void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r,t);
-
- if (r instanceof FutureTask) {
- assert t == null;
- try
- {
- ((FutureTask)r).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- t = e;
- }
- }
-
- if ( t != null )
- {
- Context ctx = ThreadLocalContext.get();
- if ( ctx != null )
- {
- Object object = ctx.get(r.getClass().getName());
-
- if ( object != null )
- {
- logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
- }
- }
- logger_.error("Error in ThreadPoolExecutor", t);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.lang.management.ManagementFactory;
+
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected
+ * Runtime Exceptions.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
+{
+ private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
+
+ private ObjectName objName;
+ public DebuggableThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(threadPoolName));
+ }
+
+ public DebuggableThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactoryImpl threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ super.prestartAllCoreThreads();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id_);
+ mbs.registerMBean(this, objName);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void unregisterMBean()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.unregisterMBean(objName);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getPendingTasks()
+ {
+ return getTaskCount() - getCompletedTaskCount();
+ }
+
+ /*
+ *
+ * (non-Javadoc)
+ * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
+ * Helps us in figuring out why sometimes the threads are getting
+ * killed and replaced by new ones.
+ */
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+
+ if (r instanceof FutureTask) {
+ assert t == null;
+ try
+ {
+ ((FutureTask)r).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ t = e;
+ }
+ }
+
+ if ( t != null )
+ {
+ Context ctx = ThreadLocalContext.get();
+ if ( ctx != null )
+ {
+ Object object = ctx.get(r.getClass().getName());
+
+ if ( object != null )
+ {
+ logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
+ }
+ }
+ logger_.error("Error in ThreadPoolExecutor", t);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IContinuable.java Thu Jul 30 15:30:21 2009
@@ -1,26 +1,26 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import org.apache.commons.javaflow.Continuation;
-
-public interface IContinuable
-{
- public void run(Continuation c);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.commons.javaflow.Continuation;
+
+public interface IContinuable
+{
+ public void run(Continuation c);
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java Thu Jul 30 15:30:21 2009
@@ -1,120 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-/**
- * An abstraction for stages as described in the SEDA paper by Matt Welsh.
- * For reference to the paper look over here
- * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
- Scalable Internet Services</a>.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IStage
-{
- /**
- * Get the name of the associated stage.
- * @return name of the associated stage.
- */
- public String getName();
-
- /**
- * Get the thread pool used by this stage
- * internally.
- */
- public ExecutorService getInternalThreadPool();
-
- /**
- * This method is used to execute a piece of code on
- * this stage. The idea is that the <i>run()</i> method
- * of this Runnable instance is invoked on a thread from a
- * thread pool that belongs to this stage.
- * @param runnable instance whose run() method needs to be invoked.
- */
- public void execute(Runnable runnable);
-
- /**
- * This method is used to execute a piece of code on
- * this stage which returns a Future pointer. The idea
- * is that the <i>call()</i> method of this Runnable
- * instance is invoked on a thread from a thread pool
- * that belongs to this stage.
-
- * @param callable instance that needs to be invoked.
- * @return the future return object from the callable.
- */
- public Future<Object> execute(Callable<Object> callable);
-
- /**
- * This method is used to submit tasks to this stage
- * that execute periodically.
- *
- * @param command the task to execute.
- * @param delay the time to delay first execution
- * @param unit the time unit of the initialDelay and period parameters
- * @return the future return object from the runnable.
- */
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
-
- /**
- * This method is used to submit tasks to this stage
- * that execute periodically.
- * @param command the task to execute.
- * @param initialDelay the time to delay first execution
- * @param period the period between successive executions
- * @param unit the time unit of the initialDelay and period parameters
- * @return the future return object from the runnable.
- */
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
-
- /**
- * This method is used to submit tasks to this stage
- * that execute periodically.
- * @param command the task to execute.
- * @param initialDelay the time to delay first execution
- * @param delay the delay between the termination of one execution and the commencement of the next.
- * @param unit the time unit of the initialDelay and delay parameters
- * @return the future return object from the runnable.
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
-
- /**
- * Shutdown the stage. All the threads of this stage
- * are forcefully shutdown. Any pending tasks on this
- * stage could be dropped or the stage could wait for
- * these tasks to be completed. This is however an
- * implementation detail.
- */
- public void shutdown();
-
- /**
- * Checks if the stage has been shutdown.
- * @return true if shut down, otherwise false.
- */
- public boolean isShutdown();
-
- /**
- * This method returns the number of tasks that are
- * pending on this stage to be executed.
- * @return task count.
- */
- public long getPendingTasks();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * An abstraction for stages as described in the SEDA paper by Matt Welsh.
+ * For reference to the paper look over here
+ * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
+ Scalable Internet Services</a>.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStage
+{
+ /**
+ * Get the name of the associated stage.
+ * @return name of the associated stage.
+ */
+ public String getName();
+
+ /**
+ * Get the thread pool used by this stage
+ * internally.
+ */
+ public ExecutorService getInternalThreadPool();
+
+ /**
+ * This method is used to execute a piece of code on
+ * this stage. The idea is that the <i>run()</i> method
+ * of this Runnable instance is invoked on a thread from a
+ * thread pool that belongs to this stage.
+ * @param runnable instance whose run() method needs to be invoked.
+ */
+ public void execute(Runnable runnable);
+
+ /**
+ * This method is used to execute a piece of code on
+ * this stage which returns a Future pointer. The idea
+ * is that the <i>call()</i> method of this Runnable
+ * instance is invoked on a thread from a thread pool
+ * that belongs to this stage.
+
+ * @param callable instance that needs to be invoked.
+ * @return the future return object from the callable.
+ */
+ public Future<Object> execute(Callable<Object> callable);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ *
+ * @param command the task to execute.
+ * @param delay the time to delay first execution
+ * @param unit the time unit of the initialDelay and period parameters
+ * @return the future return object from the runnable.
+ */
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ * @param command the task to execute.
+ * @param initialDelay the time to delay first execution
+ * @param period the period between successive executions
+ * @param unit the time unit of the initialDelay and period parameters
+ * @return the future return object from the runnable.
+ */
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ * @param command the task to execute.
+ * @param initialDelay the time to delay first execution
+ * @param delay the delay between the termination of one execution and the commencement of the next.
+ * @param unit the time unit of the initialDelay and delay parameters
+ * @return the future return object from the runnable.
+ */
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
+
+ /**
+ * Shutdown the stage. All the threads of this stage
+ * are forcefully shutdown. Any pending tasks on this
+ * stage could be dropped or the stage could wait for
+ * these tasks to be completed. This is however an
+ * implementation detail.
+ */
+ public void shutdown();
+
+ /**
+ * Checks if the stage has been shutdown.
+ * @return true if shut down, otherwise false.
+ */
+ public boolean isShutdown();
+
+ /**
+ * This method returns the number of tasks that are
+ * pending on this stage to be executed.
+ * @return task count.
+ */
+ public long getPendingTasks();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java Thu Jul 30 15:30:21 2009
@@ -1,98 +1,98 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-import javax.naming.OperationNotSupportedException;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * This class is an implementation of the <i>IStage</i> interface. In particular
- * it is for a stage that has a thread pool with multiple threads. For details
- * please refer to the <i>IStage</i> documentation.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MultiThreadedStage implements IStage
-{
- private String name_;
- private DebuggableThreadPoolExecutor executorService_;
-
- public MultiThreadedStage(String name, int numThreads)
- {
- name_ = name;
- executorService_ = new DebuggableThreadPoolExecutor( numThreads,
- numThreads,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl(name)
- );
- }
-
- public String getName()
- {
- return name_;
- }
-
- public ExecutorService getInternalThreadPool()
- {
- return executorService_;
- }
-
- public Future<Object> execute(Callable<Object> callable) {
- return executorService_.submit(callable);
- }
-
- public void execute(Runnable runnable) {
- executorService_.execute(runnable);
- }
-
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- {
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public void shutdown() {
- executorService_.shutdownNow();
- }
-
- public boolean isShutdown()
- {
- return executorService_.isShutdown();
- }
-
- public long getPendingTasks(){
- return executorService_.getPendingTasks();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with multiple threads. For details
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiThreadedStage implements IStage
+{
+ private String name_;
+ private DebuggableThreadPoolExecutor executorService_;
+
+ public MultiThreadedStage(String name, int numThreads)
+ {
+ name_ = name;
+ executorService_ = new DebuggableThreadPoolExecutor( numThreads,
+ numThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl(name)
+ );
+ }
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public Future<Object> execute(Callable<Object> callable) {
+ return executorService_.submit(callable);
+ }
+
+ public void execute(Runnable runnable) {
+ executorService_.execute(runnable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown() {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getPendingTasks(){
+ return executorService_.getPendingTasks();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RejectedExecutionHandler.java Thu Jul 30 15:30:21 2009
@@ -1,24 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-interface RejectedExecutionHandler
-{
- public void rejectedExecution(Runnable r, ContinuationsExecutor executor);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+interface RejectedExecutionHandler
+{
+ public void rejectedExecution(Runnable r, ContinuationsExecutor executor);
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java Thu Jul 30 15:30:21 2009
@@ -1,100 +1,100 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-public class SingleThreadedContinuationStage implements IStage
-{
- protected ContinuationsExecutor executorService_;
- private String name_;
-
- public SingleThreadedContinuationStage(String name)
- {
- executorService_ = new ContinuationsExecutor( 1,
- 1,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl(name)
- );
- name_ = name;
- }
-
- /* Implementing the IStage interface methods */
-
- public String getName()
- {
- return name_;
- }
-
- public ExecutorService getInternalThreadPool()
- {
- return executorService_;
- }
-
- public void execute(Runnable runnable)
- {
- executorService_.execute(runnable);
- }
-
- public Future<Object> execute(Callable<Object> callable)
- {
- return executorService_.submit(callable);
- }
-
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- {
- //return executorService_.schedule(command, delay, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- {
- //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- {
- //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public void shutdown()
- {
- executorService_.shutdownNow();
- }
-
- public boolean isShutdown()
- {
- return executorService_.isShutdown();
- }
-
- public long getPendingTasks(){
- return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
- }
- /* Finished implementing the IStage interface methods */
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SingleThreadedContinuationStage implements IStage
+{
+ protected ContinuationsExecutor executorService_;
+ private String name_;
+
+ public SingleThreadedContinuationStage(String name)
+ {
+ executorService_ = new ContinuationsExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl(name)
+ );
+ name_ = name;
+ }
+
+ /* Implementing the IStage interface methods */
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public void execute(Runnable runnable)
+ {
+ executorService_.execute(runnable);
+ }
+
+ public Future<Object> execute(Callable<Object> callable)
+ {
+ return executorService_.submit(callable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ //return executorService_.schedule(command, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown()
+ {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getPendingTasks(){
+ return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ }
+ /* Finished implementing the IStage interface methods */
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java Thu Jul 30 15:30:21 2009
@@ -1,100 +1,100 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.net.*;
-
-/**
- * This class is an implementation of the <i>IStage</i> interface. In particular
- * it is for a stage that has a thread pool with a single thread. For details
- * please refer to the <i>IStage</i> documentation.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class SingleThreadedStage implements IStage
-{
- protected DebuggableThreadPoolExecutor executorService_;
- private String name_;
-
- public SingleThreadedStage(String name)
- {
- executorService_ = new DebuggableThreadPoolExecutor(name);
- name_ = name;
- }
-
- /* Implementing the IStage interface methods */
-
- public String getName()
- {
- return name_;
- }
-
- public ExecutorService getInternalThreadPool()
- {
- return executorService_;
- }
-
- public void execute(Runnable runnable)
- {
- executorService_.execute(runnable);
- }
-
- public Future<Object> execute(Callable<Object> callable)
- {
- return executorService_.submit(callable);
- }
-
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- {
- //return executorService_.schedule(command, delay, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- {
- //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- {
- //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
- throw new UnsupportedOperationException("This operation is not supported");
- }
-
- public void shutdown()
- {
- executorService_.shutdownNow();
- }
-
- public boolean isShutdown()
- {
- return executorService_.isShutdown();
- }
-
- public long getPendingTasks(){
- return executorService_.getPendingTasks();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.net.*;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with a single thread. For details
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SingleThreadedStage implements IStage
+{
+ protected DebuggableThreadPoolExecutor executorService_;
+ private String name_;
+
+ public SingleThreadedStage(String name)
+ {
+ executorService_ = new DebuggableThreadPoolExecutor(name);
+ name_ = name;
+ }
+
+ /* Implementing the IStage interface methods */
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public void execute(Runnable runnable)
+ {
+ executorService_.execute(runnable);
+ }
+
+ public Future<Object> execute(Callable<Object> callable)
+ {
+ return executorService_.submit(callable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ //return executorService_.schedule(command, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown()
+ {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getPendingTasks(){
+ return executorService_.getPendingTasks();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Thu Jul 30 15:30:21 2009
@@ -1,117 +1,117 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-
-/**
- * This class manages all stages that exist within a process. The application registers
- * and de-registers stages with this abstraction. Any component that has the <i>ID</i>
- * associated with a stage can obtain a handle to actual stage.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class StageManager
-{
- private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
-
- /**
- * Register a stage with the StageManager
- * @param stageName stage name.
- * @param stage stage for the respective message types.
- */
- public static void registerStage(String stageName, IStage stage)
- {
- stageQueues_.put(stageName, stage);
- }
-
- /**
- * Returns the stage that we are currently executing on.
- * This relies on the fact that the thread names in the
- * stage have the name of the stage as the prefix.
- * @return Returns the stage that we are currently executing on.
- */
- public static IStage getCurrentStage()
- {
- String name = Thread.currentThread().getName();
- String[] peices = name.split(":");
- IStage stage = getStage(peices[0]);
- return stage;
- }
-
- /**
- * Retrieve a stage from the StageManager
- * @param stageName name of the stage to be retrieved.
- */
- public static IStage getStage(String stageName)
- {
- return stageQueues_.get(stageName);
- }
-
- /**
- * Retrieve the internal thread pool associated with the
- * specified stage name.
- * @param stageName name of the stage.
- */
- public static ExecutorService getStageInternalThreadPool(String stageName)
- {
- IStage stage = getStage(stageName);
- if ( stage == null )
- throw new IllegalArgumentException("No stage registered with name " + stageName);
- return stage.getInternalThreadPool();
- }
-
- /**
- * Deregister a stage from StageManager
- * @param stageName stage name.
- */
- public static void deregisterStage(String stageName)
- {
- stageQueues_.remove(stageName);
- }
-
- /**
- * This method gets the number of tasks on the
- * stage's internal queue.
- * @param stage name of the stage
- * @return stage task count.
- */
- public static long getStageTaskCount(String stage)
- {
- return stageQueues_.get(stage).getPendingTasks();
- }
-
- /**
- * This method shuts down all registered stages.
- */
- public static void shutdown()
- {
- Set<String> stages = stageQueues_.keySet();
- for ( String stage : stages )
- {
- IStage registeredStage = stageQueues_.get(stage);
- registeredStage.shutdown();
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * This class manages all stages that exist within a process. The application registers
+ * and de-registers stages with this abstraction. Any component that has the <i>ID</i>
+ * associated with a stage can obtain a handle to actual stage.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StageManager
+{
+ private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+
+ /**
+ * Register a stage with the StageManager
+ * @param stageName stage name.
+ * @param stage stage for the respective message types.
+ */
+ public static void registerStage(String stageName, IStage stage)
+ {
+ stageQueues_.put(stageName, stage);
+ }
+
+ /**
+ * Returns the stage that we are currently executing on.
+ * This relies on the fact that the thread names in the
+ * stage have the name of the stage as the prefix.
+ * @return Returns the stage that we are currently executing on.
+ */
+ public static IStage getCurrentStage()
+ {
+ String name = Thread.currentThread().getName();
+ String[] peices = name.split(":");
+ IStage stage = getStage(peices[0]);
+ return stage;
+ }
+
+ /**
+ * Retrieve a stage from the StageManager
+ * @param stageName name of the stage to be retrieved.
+ */
+ public static IStage getStage(String stageName)
+ {
+ return stageQueues_.get(stageName);
+ }
+
+ /**
+ * Retrieve the internal thread pool associated with the
+ * specified stage name.
+ * @param stageName name of the stage.
+ */
+ public static ExecutorService getStageInternalThreadPool(String stageName)
+ {
+ IStage stage = getStage(stageName);
+ if ( stage == null )
+ throw new IllegalArgumentException("No stage registered with name " + stageName);
+ return stage.getInternalThreadPool();
+ }
+
+ /**
+ * Deregister a stage from StageManager
+ * @param stageName stage name.
+ */
+ public static void deregisterStage(String stageName)
+ {
+ stageQueues_.remove(stageName);
+ }
+
+ /**
+ * This method gets the number of tasks on the
+ * stage's internal queue.
+ * @param stage name of the stage
+ * @return stage task count.
+ */
+ public static long getStageTaskCount(String stage)
+ {
+ return stageQueues_.get(stage).getPendingTasks();
+ }
+
+ /**
+ * This method shuts down all registered stages.
+ */
+ public static void shutdown()
+ {
+ Set<String> stages = stageQueues_.keySet();
+ for ( String stage : stages )
+ {
+ IStage registeredStage = stageQueues_.get(stage);
+ registeredStage.shutdown();
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadFactoryImpl.java Thu Jul 30 15:30:21 2009
@@ -1,51 +1,51 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * This class is an implementation of the <i>ThreadFactory</i> interface. This
- * is useful to give Java threads meaningful names which is useful when using
- * a tool like JConsole.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ThreadFactoryImpl implements ThreadFactory
-{
- protected String id_;
- protected ThreadGroup threadGroup_;
- protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
-
- public ThreadFactoryImpl(String id)
- {
- SecurityManager sm = System.getSecurityManager();
- threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
- id_ = id;
- }
-
- public Thread newThread(Runnable runnable)
- {
- String name = id_ + ":" + threadNbr_.getAndIncrement();
- Thread thread = new Thread(threadGroup_, runnable, name);
- return thread;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This class is an implementation of the <i>ThreadFactory</i> interface. This
+ * is useful to give Java threads meaningful names which is useful when using
+ * a tool like JConsole.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ThreadFactoryImpl implements ThreadFactory
+{
+ protected String id_;
+ protected ThreadGroup threadGroup_;
+ protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
+
+ public ThreadFactoryImpl(String id)
+ {
+ SecurityManager sm = System.getSecurityManager();
+ threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ id_ = id;
+ }
+
+ public Thread newThread(Runnable runnable)
+ {
+ String name = id_ + ":" + threadNbr_.getAndIncrement();
+ Thread thread = new Thread(threadGroup_, runnable, name);
+ return thread;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java Thu Jul 30 15:30:21 2009
@@ -1,42 +1,42 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-/**
- * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when
- * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-
-public class ThreadLocalContext
-{
- private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
-
- public static void put(Context value)
- {
- tls_.set(value);
- }
-
- public static Context get()
- {
- return tls_.get();
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+/**
+ * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when
+ * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+public class ThreadLocalContext
+{
+ private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
+
+ public static void put(Context value)
+ {
+ tls_.set(value);
+ }
+
+ public static Context get()
+ {
+ return tls_.get();
+ }
}
\ No newline at end of file