You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/07 16:18:20 UTC

svn commit: r822752 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java

Author: jbellis
Date: Wed Oct  7 14:18:20 2009
New Revision: 822752

URL: http://svn.apache.org/viewvc?rev=822752&view=rev
Log:
use CallerRunsPolicy instead of rejecting runnables on multi-threaded executors w/ blocking queues
patch by jbellis; tested by Chris Were for CASSANDRA-471

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=822752&r1=822751&r2=822752&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Oct  7 14:18:20 2009
@@ -35,18 +35,17 @@
 {
     private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
 
-    private ObjectName objName;
     public DebuggableThreadPoolExecutor(String threadPoolName) 
     {
         this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
     }
 
     public DebuggableThreadPoolExecutor(int corePoolSize,
-            int maximumPoolSize,
-            long keepAliveTime,
-            TimeUnit unit,
-            BlockingQueue<Runnable> workQueue,
-            NamedThreadFactory threadFactory)
+                                        int maximumPoolSize,
+                                        long keepAliveTime,
+                                        TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue,
+                                        NamedThreadFactory threadFactory)
     {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
         super.prestartAllCoreThreads();
@@ -54,13 +53,35 @@
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            objName = new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id);
-            mbs.registerMBean(this, objName);
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id));
         }
         catch (Exception e)
         {
             throw new RuntimeException(e);
         }
+
+        if (maximumPoolSize > 1)
+        {
+            this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        }
+        else
+        {
+            // preserve task serialization.  this is more complicated than it needs to be,
+            // since TPE rejects if queue.offer reports a full queue.
+            // the easiest option (since most of TPE.execute deals with private members)
+            // appears to be to wrap the given queue class with one whose offer
+            // simply delegates to put().  this would be ugly, since it violates both
+            // the spirit and letter of queue.offer, but effective.
+            // so far, though, all our serialized executors use unbounded queues,
+            // so actually implementing this has not been necessary.
+            this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+            {
+                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+                {
+                    throw new AssertionError("Blocking serialized executor is not yet implemented");
+                }
+            });
+        }
     }
 
     public long getPendingTasks()