You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/07 16:18:20 UTC
svn commit: r822752 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Author: jbellis
Date: Wed Oct 7 14:18:20 2009
New Revision: 822752
URL: http://svn.apache.org/viewvc?rev=822752&view=rev
Log:
use CallerRunsPolicy instead of rejecting runnables on multi-threaded executors w/ blocking queues
patch by jbellis; tested by Chris Were for CASSANDRA-471
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=822752&r1=822751&r2=822752&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Oct 7 14:18:20 2009
@@ -35,18 +35,17 @@
{
private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
- private ObjectName objName;
public DebuggableThreadPoolExecutor(String threadPoolName)
{
this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
}
public DebuggableThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- NamedThreadFactory threadFactory)
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ NamedThreadFactory threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
@@ -54,13 +53,35 @@
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id);
- mbs.registerMBean(this, objName);
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
+
+ if (maximumPoolSize > 1)
+ {
+ this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+ else
+ {
+ // preserve task serialization. this is more complicated than it needs to be,
+ // since TPE rejects if queue.offer reports a full queue.
+ // the easiest option (since most of TPE.execute deals with private members)
+ // appears to be to wrap the given queue class with one whose offer
+ // simply delegates to put(). this would be ugly, since it violates both
+ // the spirit and letter of queue.offer, but effective.
+ // so far, though, all our serialized executors use unbounded queues,
+ // so actually implementing this has not been necessary.
+ this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+ {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ {
+ throw new AssertionError("Blocking serialized executor is not yet implemented");
+ }
+ });
+ }
}
public long getPendingTasks()