You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/13 23:23:44 UTC

svn commit: r898973 - in /incubator/cassandra/branches/cassandra-0.5: ./ src/java/org/apache/cassandra/concurrent/ test/unit/org/apache/cassandra/concurrent/

Author: jbellis
Date: Wed Jan 13 22:23:44 2010
New Revision: 898973

URL: http://svn.apache.org/viewvc?rev=898973&view=rev
Log:
assumption that all single-threaded executors have an unbounded queue is no longer valid. provide a policy for dealing with single thread executors w/ a full queue.  patch by jbellis; tested by Ryan Daum for CASSANDRA-694

Added:
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java   (with props)
Modified:
    incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java

Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Jan 13 22:23:44 2010
@@ -1,6 +1,8 @@
 0.5.0 final
  * avoid attempting to delete temporary bootstrap files twice (CASSANDRA-681)
  * fix bogus NaN in nodeprobe cfstats output (CASSANDRA-646)
+ * provide a policy for dealing with single thread executors w/ a full queue
+   (CASSANDRA-694)
 
 
 0.5.0 RC3

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Jan 13 22:23:44 2010
@@ -65,23 +65,35 @@
 
         if (maximumPoolSize > 1)
         {
+            // clearly strict serialization is not a requirement.  just make the calling thread execute.
             this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         }
         else
         {
             // preserve task serialization.  this is more complicated than it needs to be,
-            // since TPE rejects if queue.offer reports a full queue.
-            // the easiest option (since most of TPE.execute deals with private members)
-            // appears to be to wrap the given queue class with one whose offer
-            // simply delegates to put().  this would be ugly, since it violates both
-            // the spirit and letter of queue.offer, but effective.
-            // so far, though, all our serialized executors use unbounded queues,
-            // so actually implementing this has not been necessary.
+            // since TPE rejects if queue.offer reports a full queue.  we'll just
+            // override this with a handler that retries until it gets in.  ugly, but effective.
+            // (there is an extensive analysis of the options here at
+            //  http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
             this.setRejectedExecutionHandler(new RejectedExecutionHandler()
             {
-                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+                public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
                 {
-                    throw new AssertionError("Blocking serialized executor is not yet implemented");
+                    BlockingQueue<Runnable> queue = executor.getQueue();
+                    while (true)
+                    {
+                        if (executor.isShutdown())
+                            throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
+                        try
+                        {
+                            if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                                break;
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError(e);    
+                        }
+                    }
                 }
             });
         }

Added: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=898973&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java Wed Jan 13 22:23:44 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DebuggableThreadPoolExecutorTest
+{
+    @Test
+    public void testSerialization() throws InterruptedException
+    {
+        LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
+        DebuggableThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(1,
+                                                                                 1,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.MILLISECONDS,
+                                                                                 q,
+                                                                                 new NamedThreadFactory("TEST"));
+        WrappedRunnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws InterruptedException
+            {
+                Thread.sleep(50);
+            }
+        };
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++)
+        {
+            executor.submit(runnable);
+        }
+        assert q.size() > 0 : q.size();
+        while (executor.getCompletedTaskCount() < 10)
+            continue;
+        long delta = System.currentTimeMillis() - start;
+        assert delta >= 9 * 50 : delta;
+    }
+}

Propchange: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native