You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/13 23:23:44 UTC
svn commit: r898973 - in /incubator/cassandra/branches/cassandra-0.5: ./
src/java/org/apache/cassandra/concurrent/
test/unit/org/apache/cassandra/concurrent/
Author: jbellis
Date: Wed Jan 13 22:23:44 2010
New Revision: 898973
URL: http://svn.apache.org/viewvc?rev=898973&view=rev
Log:
assumption that all single-threaded executors have an unbounded queue is no longer valid. provide a policy for dealing with single thread executors w/ a full queue. patch by jbellis; tested by Ryan Daum for CASSANDRA-694
Added:
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java (with props)
Modified:
incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Jan 13 22:23:44 2010
@@ -1,6 +1,8 @@
0.5.0 final
* avoid attempting to delete temporary bootstrap files twice (CASSANDRA-681)
* fix bogus NaN in nodeprobe cfstats output (CASSANDRA-646)
+ * provide a policy for dealing with single thread executors w/ a full queue
+ (CASSANDRA-694)
0.5.0 RC3
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Jan 13 22:23:44 2010
@@ -65,23 +65,35 @@
if (maximumPoolSize > 1)
{
+ // clearly strict serialization is not a requirement. just make the calling thread execute.
this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
else
{
// preserve task serialization. this is more complicated than it needs to be,
- // since TPE rejects if queue.offer reports a full queue.
- // the easiest option (since most of TPE.execute deals with private members)
- // appears to be to wrap the given queue class with one whose offer
- // simply delegates to put(). this would be ugly, since it violates both
- // the spirit and letter of queue.offer, but effective.
- // so far, though, all our serialized executors use unbounded queues,
- // so actually implementing this has not been necessary.
+ // since TPE rejects if queue.offer reports a full queue. we'll just
+ // override this with a handler that retries until it gets in. ugly, but effective.
+ // (there is an extensive analysis of the options here at
+ // http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
this.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
{
- throw new AssertionError("Blocking serialized executor is not yet implemented");
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
+ {
+ if (executor.isShutdown())
+ throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
}
});
}
Added: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=898973&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java Wed Jan 13 22:23:44 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DebuggableThreadPoolExecutorTest
+{
+ @Test
+ public void testSerialization() throws InterruptedException
+ {
+ LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
+ DebuggableThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ q,
+ new NamedThreadFactory("TEST"));
+ WrappedRunnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException
+ {
+ Thread.sleep(50);
+ }
+ };
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ executor.submit(runnable);
+ }
+ assert q.size() > 0 : q.size();
+ while (executor.getCompletedTaskCount() < 10)
+ continue;
+ long delta = System.currentTimeMillis() - start;
+ assert delta >= 9 * 50 : delta;
+ }
+}
Propchange: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
------------------------------------------------------------------------------
svn:eol-style = native