You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/05/29 23:25:32 UTC

[1/2] More efficient executor service for fast operations

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 a8d18c9c1 -> 5420b7a22


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..0fd53bb
--- /dev/null
+++ b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+    private static final class WaitTask implements Runnable
+    {
+        final long nanos;
+
+        private WaitTask(long nanos)
+        {
+            this.nanos = nanos;
+        }
+
+        public void run()
+        {
+            LockSupport.parkNanos(nanos);
+        }
+    }
+
+    private static final class Result implements Comparable<Result>
+    {
+        final Future<?> future;
+        final long forecastedCompletion;
+
+        private Result(Future<?> future, long forecastedCompletion)
+        {
+            this.future = future;
+            this.forecastedCompletion = forecastedCompletion;
+        }
+
+        public int compareTo(Result that)
+        {
+            int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.hashCode(), that.hashCode());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.future.hashCode(), that.future.hashCode());
+        }
+    }
+
+    private static final class Batch implements Comparable<Batch>
+    {
+        final TreeSet<Result> results;
+        final long timeout;
+        final int executorIndex;
+
+        private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+        {
+            this.results = results;
+            this.timeout = timeout;
+            this.executorIndex = executorIndex;
+        }
+
+        public int compareTo(Batch that)
+        {
+            int c = Long.compare(this.timeout, that.timeout);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.results.size(), that.results.size());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.hashCode(), that.hashCode());
+        }
+    }
+
+    @Test
+    public void testPromptnessOfExecution() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+    }
+
+    private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException, TimeoutException
+    {
+        final int executorCount = 4;
+        int threadCount = 8;
+        int maxQueued = 1024;
+        final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
+        final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+        final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+        final int[] threadCounts = new int[executorCount];
+        final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
+        final ExecutorService[] executors = new ExecutorService[executorCount];
+        for (int i = 0 ; i < executors.length ; i++)
+        {
+            executors[i] = JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
+            threadCounts[i] = threadCount;
+            workCount[i] = new WeibullDistribution(2, maxQueued);
+            threadCount *= 2;
+            maxQueued *= 2;
+        }
+
+        long runs = 0;
+        long events = 0;
+        final TreeSet<Batch> pending = new TreeSet<>();
+        final BitSet executorsWithWork = new BitSet(executorCount);
+        long until = 0;
+        // basic idea is to go through different levels of load on the executor service; initially is all small batches
+        // (mostly within max queue size) of very short operations, moving to progressively larger batches
+        // (beyond max queued size), and longer operations
+        for (float multiplier = 0f ; multiplier < 2.01f ; )
+        {
+            if (System.nanoTime() > until)
+            {
+                System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
+                events = 0;
+                until = System.nanoTime() + intervalNanos;
+                multiplier += loadIncrement;
+                System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+            }
+
+            // wait a random amount of time so we submit new tasks in various stages of
+            long timeout;
+            if (pending.isEmpty()) timeout = 0;
+            else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+            else if (pending.size() == executorCount) timeout = pending.first().timeout;
+            else timeout = (long) (Math.random() * pending.last().timeout);
+
+            while (!pending.isEmpty() && timeout > System.nanoTime())
+            {
+                Batch first = pending.first();
+                boolean complete = false;
+                try
+                {
+                    for (Result result : first.results.descendingSet())
+                        result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
+                    complete = true;
+                }
+                catch (TimeoutException e)
+                {
+                }
+                if (!complete && System.nanoTime() > first.timeout)
+                {
+                    for (Result result : first.results)
+                        if (!result.future.isDone())
+                            throw new AssertionError();
+                    complete = true;
+                }
+                if (complete)
+                {
+                    pending.pollFirst();
+                    executorsWithWork.clear(first.executorIndex);
+                }
+            }
+
+            // if we've emptied the executors, give all our threads an opportunity to spin down
+            if (timeout == Long.MAX_VALUE)
+                Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+
+            // submit a random batch to the first free executor service
+            int executorIndex = executorsWithWork.nextClearBit(0);
+            if (executorIndex >= executorCount)
+                continue;
+            executorsWithWork.set(executorIndex);
+            ExecutorService executor = executors[executorIndex];
+            TreeSet<Result> results = new TreeSet<>();
+            int count = (int) (workCount[executorIndex].sample() * multiplier);
+            long targetTotalElapsed = 0;
+            long start = System.nanoTime();
+            long baseTime;
+            if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
+            else  baseTime = 0;
+            for (int j = 0 ; j < count ; j++)
+            {
+                long time;
+                if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
+                else time = (long) (baseTime * Math.random());
+                if (time < minWorkTime)
+                    time = minWorkTime;
+                if (time > maxWorkTime)
+                    time = maxWorkTime;
+                targetTotalElapsed += time;
+                Future<?> future = executor.submit(new WaitTask(time));
+                results.add(new Result(future, System.nanoTime() + time));
+            }
+            long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
+                       + TimeUnit.MILLISECONDS.toNanos(100L);
+            long now = System.nanoTime();
+            if (runs++ > executorCount && now > end)
+                throw new AssertionError();
+            events += results.size();
+            pending.add(new Batch(results, end, executorIndex));
+//            System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
+    {
+        // do longer test
+        new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 48b8ac9..c3ce810 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 


[2/2] git commit: More efficient executor service for fast operations

Posted by be...@apache.org.
More efficient executor service for fast operations

Patch by Benedict Elliott Smith; reviewed by Jason Brown for CASSANDRA 4718


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5420b7a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5420b7a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5420b7a2

Branch: refs/heads/cassandra-2.1
Commit: 5420b7a2296d230e7fd5bc2f41fc6472a9c8b55e
Parents: a8d18c9
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Thu May 29 22:16:03 2014 +0100
Committer: belliottsmith <gi...@sub.laerad.com>
Committed: Thu May 29 22:20:21 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../AbstractTracingAwareExecutorService.java    | 227 ++++++++++++
 .../DebuggableThreadPoolExecutor.java           |   5 +
 .../JMXEnabledSharedExecutorPool.java           | 114 ++++++
 .../cassandra/concurrent/SEPExecutor.java       | 274 ++++++++++++++
 .../apache/cassandra/concurrent/SEPWorker.java  | 368 +++++++++++++++++++
 .../concurrent/SharedExecutorPool.java          |  99 +++++
 .../cassandra/concurrent/StageManager.java      |  22 +-
 .../concurrent/TracingAwareExecutorService.java |   3 +
 .../apache/cassandra/metrics/SEPMetrics.java    |  98 +++++
 .../metrics/ThreadPoolMetricNameFactory.java    |  47 +++
 .../cassandra/metrics/ThreadPoolMetrics.java    |  28 +-
 .../apache/cassandra/net/MessagingService.java  |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  |   2 +-
 .../apache/cassandra/repair/RepairSession.java  |   1 +
 .../cassandra/service/AbstractReadExecutor.java |   9 +-
 .../service/AbstractWriteResponseHandler.java   |   2 +-
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../service/TruncateResponseHandler.java        |   2 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   2 +-
 .../org/apache/cassandra/transport/Message.java |   2 +-
 .../transport/RequestThreadPoolExecutor.java    |  19 +-
 .../apache/cassandra/utils/SimpleCondition.java |  84 -----
 .../utils/concurrent/SimpleCondition.java       |  97 +++++
 .../cassandra/utils/concurrent/WaitQueue.java   |  72 ++--
 .../concurrent/LongSharedExecutorPoolTest.java  | 228 ++++++++++++
 .../apache/cassandra/repair/ValidatorTest.java  |   2 +-
 28 files changed, 1638 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f313278..3b09b7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc1
+ * More efficient executor service for fast operations (CASSANDRA-4718)
  * Move less common tools into a new cassandra-tools package (CASSANDRA-7160)
  * Support more concurrent requests in native protocol (CASSANDRA-7231)
  * Add tab-completion to debian nodetool packaging (CASSANDRA-6421)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
new file mode 100644
index 0000000..544e8a7
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.tracing.Tracing.isTracing;
+
+public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class);
+
+    protected abstract void addTask(FutureTask<?> futureTask);
+    protected abstract void onCompletion();
+
+    /** Task Submission / Creation / Objects **/
+
+    public <T> FutureTask<T> submit(Callable<T> task)
+    {
+        return submit(newTaskFor(task));
+    }
+
+    public FutureTask<?> submit(Runnable task)
+    {
+        return submit(newTaskFor(task, null));
+    }
+
+    public <T> FutureTask<T> submit(Runnable task, T result)
+    {
+        return submit(newTaskFor(task, result));
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+    {
+        return newTaskFor(runnable, result, null);
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState)
+    {
+        if (traceState != null || isTracing())
+        {
+            if (runnable instanceof TraceSessionFutureTask)
+                return (TraceSessionFutureTask<T>) runnable;
+            return new TraceSessionFutureTask<T>(runnable, result, traceState);
+        }
+        if (runnable instanceof FutureTask)
+            return (FutureTask<T>) runnable;
+        return new FutureTask<>(runnable, result);
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+    {
+        if (isTracing())
+        {
+            if (callable instanceof TraceSessionFutureTask)
+                return (TraceSessionFutureTask<T>) callable;
+            return new TraceSessionFutureTask<T>(callable, null);
+        }
+        if (callable instanceof java.util.concurrent.FutureTask)
+            return (FutureTask<T>) callable;
+        return new FutureTask<>(callable);
+    }
+
+    private class TraceSessionFutureTask<T> extends FutureTask<T>
+    {
+        private final TraceState state;
+
+        public TraceSessionFutureTask(Callable<T> callable, TraceState state)
+        {
+            super(callable);
+            this.state = state;
+        }
+
+        public TraceSessionFutureTask(Runnable runnable, T result, TraceState state)
+        {
+            super(runnable, result);
+            this.state = state;
+        }
+
+        public void run()
+        {
+            Tracing.instance.set(state);
+            try
+            {
+                super.run();
+            }
+            finally
+            {
+                Tracing.instance.set(null);
+            }
+        }
+    }
+
+    class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+    {
+        private boolean failure;
+        private Object result = this;
+        private final Callable<T> callable;
+
+        public FutureTask(Callable<T> callable)
+        {
+            this.callable = callable;
+        }
+        public FutureTask(Runnable runnable, T result)
+        {
+            this(Executors.callable(runnable, result));
+        }
+
+        public void run()
+        {
+            try
+            {
+                result = callable.call();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+                result = t;
+                failure = true;
+            }
+            finally
+            {
+                signalAll();
+                onCompletion();
+            }
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            return false;
+        }
+
+        public boolean isCancelled()
+        {
+            return false;
+        }
+
+        public boolean isDone()
+        {
+            return isSignaled();
+        }
+
+        public T get() throws InterruptedException, ExecutionException
+        {
+            await();
+            Object result = this.result;
+            if (failure)
+                throw new ExecutionException((Throwable) result);
+            return (T) result;
+        }
+
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+        {
+            await(timeout, unit);
+            Object result = this.result;
+            if (failure)
+                throw new ExecutionException((Throwable) result);
+            return (T) result;
+        }
+    }
+
+    private <T> FutureTask<T> submit(FutureTask<T> task)
+    {
+        addTask(task);
+        return task;
+    }
+
+    public void execute(Runnable command)
+    {
+        addTask(newTaskFor(command, null));
+    }
+
+    public void execute(Runnable command, TraceState state)
+    {
+        addTask(newTaskFor(command, null, state));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 665f0b0..cd9e9e4 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -138,6 +138,11 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
                       : new TraceSessionWrapper<Object>(command, state));
     }
 
+    public void maybeExecuteImmediately(Runnable command)
+    {
+        execute(command);
+    }
+
     // execute does not call newTaskFor
     @Override
     public void execute(Runnable command)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
new file mode 100644
index 0000000..d70e524
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
@@ -0,0 +1,114 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.metrics.SEPMetrics;
+
+public class JMXEnabledSharedExecutorPool extends SharedExecutorPool
+{
+
+    public static final JMXEnabledSharedExecutorPool SHARED = new JMXEnabledSharedExecutorPool("SharedPool");
+
+    public JMXEnabledSharedExecutorPool(String poolName)
+    {
+        super(poolName);
+    }
+
+    public interface JMXEnabledSEPExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
+    {
+    }
+
+    public class JMXEnabledSEPExecutor extends SEPExecutor implements JMXEnabledSEPExecutorMBean
+    {
+
+        private final SEPMetrics metrics;
+        private final String mbeanName;
+
+        public JMXEnabledSEPExecutor(int poolSize, int maxQueuedLength, String name, String jmxPath)
+        {
+            super(JMXEnabledSharedExecutorPool.this, poolSize, maxQueuedLength);
+            metrics = new SEPMetrics(this, jmxPath, name);
+
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + name;
+
+            try
+            {
+                mbs.registerMBean(this, new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void unregisterMBean()
+        {
+            try
+            {
+                ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            // release metrics
+            metrics.release();
+        }
+
+        @Override
+        public synchronized void shutdown()
+        {
+            // synchronized, because there is no way to access super.mainLock, which would be
+            // the preferred way to make this threadsafe
+            if (!isShutdown())
+            {
+                unregisterMBean();
+            }
+            super.shutdown();
+        }
+
+        public int getCoreThreads()
+        {
+            return 0;
+        }
+
+        public void setCoreThreads(int number)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void setMaximumThreads(int number)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String name, String jmxPath)
+    {
+        JMXEnabledSEPExecutor executor = new JMXEnabledSEPExecutor(maxConcurrency, maxQueuedTasks, name, jmxPath);
+        executors.add(executor);
+        return executor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
new file mode 100644
index 0000000..bdc0045
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -0,0 +1,274 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.concurrent.SEPWorker.Work;
+
+public class SEPExecutor extends AbstractTracingAwareExecutorService
+{
+    private final SharedExecutorPool pool;
+
+    private final int maxWorkers;
+    private final int maxTasksQueued;
+
+    // stores both a set of work permits and task permits:
+    //  bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued]   (initially 0)
+    //  top 32 bits are number of work permits available in the range [0..maxWorkers]   (initially maxWorkers)
+    private final AtomicLong permits = new AtomicLong();
+
+    // producers wait on this when there is no room on the queue
+    private final WaitQueue hasRoom = new WaitQueue();
+    private final AtomicLong totalBlocked = new AtomicLong();
+    private final AtomicInteger currentlyBlocked = new AtomicInteger();
+    private final AtomicLong completedTasks = new AtomicLong();
+
+    volatile boolean shuttingDown = false;
+    final SimpleCondition shutdown = new SimpleCondition();
+
+    // TODO: see if other queue implementations might improve throughput
+    protected final ConcurrentLinkedQueue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();
+
+    SEPExecutor(SharedExecutorPool pool, int maxWorkers, int maxTasksQueued)
+    {
+        this.pool = pool;
+        this.maxWorkers = maxWorkers;
+        this.maxTasksQueued = maxTasksQueued;
+        this.permits.set(combine(0, maxWorkers));
+    }
+
+    protected void onCompletion()
+    {
+        completedTasks.incrementAndGet();
+    }
+
+    // schedules another worker for this pool if there is work outstanding and there are no spinning threads that
+    // will self-assign to it in the immediate future
+    boolean maybeSchedule()
+    {
+        if (pool.spinningCount.get() > 0 || !takeWorkPermit(true))
+            return false;
+
+        pool.schedule(new Work(this));
+        return true;
+    }
+
+    protected void addTask(FutureTask<?> task)
+    {
+        // we add to the queue first, so that when a worker takes a task permit it can be certain there is a task available
+        // this permits us to schedule threads non-spuriously; it also means work is serviced fairly
+        tasks.add(task);
+        int taskPermits;
+        while (true)
+        {
+            long current = permits.get();
+            taskPermits = taskPermits(current);
+            // because there is no difference in practical terms between the work permit being added or not (the work is already in existence)
+            // we always add our permit, but block after the fact if we breached the queue limit
+            if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits + 1)))
+                break;
+        }
+
+        if (taskPermits == 0)
+        {
+            // we only need to schedule a thread if there are no tasks already waiting to be processed, as
+            // the original enqueue will have started a thread to service its work which will have itself
+            // spawned helper workers that would have either exhausted the available tasks or are still being spawned.
+            // to avoid incurring any unnecessary signalling penalties we also do not take any work to hand to the new
+            // worker, we simply start a worker in a spinning state
+            pool.maybeStartSpinningWorker();
+        }
+        else if (taskPermits >= maxTasksQueued)
+        {
+            // register to receive a signal once a task is processed bringing the queue below its threshold
+            WaitQueue.Signal s = hasRoom.register();
+
+            // we will only be signalled once the queue drops below full, so this creates equivalent external behaviour
+            // however the advantage is that we never wake-up spuriously;
+            // we choose to always sleep, even if in the intervening time the queue has dropped below limit,
+            // so long as we _will_ eventually receive a signal
+            if (taskPermits(permits.get()) > maxTasksQueued)
+            {
+                // if we're blocking, we might as well directly schedule a worker if we aren't already at max
+                if (takeWorkPermit(true))
+                    pool.schedule(new Work(this));
+                totalBlocked.incrementAndGet();
+                currentlyBlocked.incrementAndGet();
+                s.awaitUninterruptibly();
+                currentlyBlocked.decrementAndGet();
+            }
+            else // don't propagate our signal when we cancel, just cancel
+                s.cancel();
+        }
+    }
+
+    // takes permission to perform a task, if any are available; once taken it is guaranteed
+    // that a proceeding call to tasks.poll() will return some work
+    boolean takeTaskPermit()
+    {
+        while (true)
+        {
+            long current = permits.get();
+            int taskPermits = taskPermits(current);
+            if (taskPermits == 0)
+                return false;
+            if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits - 1)))
+            {
+                if (taskPermits == maxTasksQueued && hasRoom.hasWaiters())
+                    hasRoom.signalAll();
+                return true;
+            }
+        }
+    }
+
+    // takes a worker permit and (optionally) a task permit simultaneously; if one of the two is unavailable, returns false
+    boolean takeWorkPermit(boolean takeTaskPermit)
+    {
+        int taskDelta = takeTaskPermit ? 1 : 0;
+        while (true)
+        {
+            long current = permits.get();
+            int workPermits = workPermits(current);
+            int taskPermits = taskPermits(current);
+            if (workPermits == 0 || taskPermits == 0)
+                return false;
+            if (permits.compareAndSet(current, combine(taskPermits - taskDelta, workPermits - 1)))
+            {
+                if (takeTaskPermit && taskPermits == maxTasksQueued && hasRoom.hasWaiters())
+                    hasRoom.signalAll();
+                return true;
+            }
+        }
+    }
+
+    // gives up a work permit
+    void returnWorkPermit()
+    {
+        while (true)
+        {
+            long current = permits.get();
+            int workPermits = workPermits(current);
+            if (permits.compareAndSet(current, updateWorkPermits(current, workPermits + 1)))
+                return;
+        }
+    }
+
+    public void maybeExecuteImmediately(Runnable command)
+    {
+        FutureTask<?> ft = newTaskFor(command, null);
+        if (!takeWorkPermit(false))
+        {
+            addTask(ft);
+        }
+        else
+        {
+            try
+            {
+                ft.run();
+            }
+            finally
+            {
+                returnWorkPermit();
+                // we have to maintain our invariant of always scheduling after any work is performed
+                // in this case in particular we are not processing the rest of the queue anyway, and so
+                // the work permit may go wasted if we don't immediately attempt to spawn another worker
+                maybeSchedule();
+            }
+        }
+    }
+
+    public synchronized void shutdown()
+    {
+        shuttingDown = true;
+        pool.executors.remove(this);
+        if (getActiveCount() == 0)
+            shutdown.signalAll();
+    }
+
+    public synchronized List<Runnable> shutdownNow()
+    {
+        shutdown();
+        List<Runnable> aborted = new ArrayList<>();
+        while (takeTaskPermit())
+            aborted.add(tasks.poll());
+        return aborted;
+    }
+
+    public boolean isShutdown()
+    {
+        return shuttingDown;
+    }
+
+    public boolean isTerminated()
+    {
+        return shuttingDown && shutdown.isSignaled();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        shutdown.await(timeout, unit);
+        return isTerminated();
+    }
+
+    public long getPendingTasks()
+    {
+        return taskPermits(permits.get());
+    }
+
+    public long getCompletedTasks()
+    {
+        return completedTasks.get();
+    }
+
+    public int getActiveCount()
+    {
+        return maxWorkers - workPermits(permits.get());
+    }
+
+    public int getTotalBlockedTasks()
+    {
+        return (int) totalBlocked.get();
+    }
+
+    public int getMaximumThreads()
+    {
+        return maxWorkers;
+    }
+
+    public int getCurrentlyBlockedTasks()
+    {
+        return currentlyBlocked.get();
+    }
+
+    private static int taskPermits(long both)
+    {
+        return (int) both;
+    }
+
+    private static int workPermits(long both)
+    {
+        return (int) (both >>> 32);
+    }
+
+    private static long updateTaskPermits(long prev, int taskPermits)
+    {
+        return (prev & (-1L << 32)) | taskPermits;
+    }
+
+    private static long updateWorkPermits(long prev, int workPermits)
+    {
+        return (((long) workPermits) << 32) | (prev & (-1L >>> 32));
+    }
+
+    private static long combine(int taskPermits, int workPermits)
+    {
+        return (((long) workPermits) << 32) | taskPermits;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
new file mode 100644
index 0000000..084bba7
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -0,0 +1,368 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
+{
+    private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class);
+
+    final Long workerId;
+    final Thread thread;
+    final SharedExecutorPool pool;
+
+    // prevStopCheck stores the value of pool.stopCheck after we last incremented it; if it hasn't changed,
+    // we know nobody else was spinning in the interval, so we increment our soleSpinnerSpinTime accordingly,
+    // and otherwise we set it to zero; this is then used to terminate the final spinning thread, as the coordinated
+    // strategy can only work when there are multiple threads spinning (as more sleep time must elapse than real time)
+    long prevStopCheck = 0;
+    long soleSpinnerSpinTime = 0;
+
+    SEPWorker(Long workerId, Work initialState, SharedExecutorPool pool)
+    {
+        this.pool = pool;
+        this.workerId = workerId;
+        thread = new Thread(this, pool.poolName + "-Worker-" + workerId);
+        thread.setDaemon(true);
+        set(initialState);
+        thread.start();
+    }
+
+    public void run()
+    {
+        /**
+         * we maintain two important invariants:
+         * 1)   after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed
+         *      promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to
+         *      avoid signalling if there are _any_ spinning threads. we achieve this by simply calling maybeSchedule()
+         *      on each queue if on decrementing the spin counter we hit zero.
+         * 2)   before processing a task on a given queue, we attempt to assign another worker to the _same queue only_;
+         *      this allows a producer to skip signalling work if the task queue is currently non-empty, and in conjunction
+         *      with invariant (1) ensures that if any thread was spinning when a task was added to any executor, that
+         *      task will be processed immediately if work permits are available
+         */
+
+        SEPExecutor assigned = null;
+        Runnable task = null;
+        try
+        {
+            while (true)
+            {
+                if (isSpinning() && !selfAssign())
+                {
+                    doWaitSpin();
+                    continue;
+                }
+
+                // if stop was signalled, go to sleep (don't try self-assign; being put to sleep is rare, so let's obey it
+                // whenever we receive it - though we don't apply this constraint to producers, who may reschedule us before
+                // we go to sleep)
+                if (stop())
+                    while (isStopped())
+                        LockSupport.park();
+
+                // we can be assigned any state from STOPPED, so loop if we don't actually have any tasks assigned
+                assigned = get().assigned;
+                if (assigned == null)
+                    continue;
+                task = assigned.tasks.poll();
+
+                // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
+                // (which is also a state that will never be interrupted externally)
+                set(Work.WORKING);
+                boolean shutdown;
+                while (true)
+                {
+                    // before we process any task, we maybe schedule a new worker _to our executor only_; this
+                    // ensures that even once all spinning threads have found work, if more work is left to be serviced
+                    // and permits are available, it will be dealt with immediately.
+                    assigned.maybeSchedule();
+
+                    // we know there is work waiting, as we have a work permit, so poll() will always succeed
+                    task.run();
+                    task = null;
+
+                    // if we're shutting down, or we fail to take a permit, we don't perform any more work
+                    if ((shutdown = assigned.shuttingDown) || !assigned.takeTaskPermit())
+                        break;
+                    task = assigned.tasks.poll();
+                }
+
+                // return our work permit, and maybe signal shutdown
+                assigned.returnWorkPermit();
+                if (shutdown && assigned.getActiveCount() == 0)
+                    assigned.shutdown.signalAll();
+                assigned = null;
+
+                // try to immediately reassign ourselves some work; if we fail, start spinning
+                if (!selfAssign())
+                    startSpinning();
+            }
+        }
+        catch (Throwable t)
+        {
+            while (true)
+            {
+                if (get().assigned != null)
+                {
+                    assigned = get().assigned;
+                    set(Work.WORKING);
+                }
+                if (assign(Work.STOPPED, true))
+                    break;
+            }
+            if (assigned != null)
+                assigned.returnWorkPermit();
+            if (task != null)
+                logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
+            else
+                logger.error("Unexpected exception killed worker: {}", t);
+        }
+    }
+
+    // try to assign this worker the provided work
+    // valid states to assign are SPINNING, STOP_SIGNALLED, (ASSIGNED);
+    // restores invariants of the various states (e.g. spinningCount, descheduled collection and thread park status)
+    boolean assign(Work work, boolean self)
+    {
+        Work state = get();
+        while (state.canAssign(self))
+        {
+            if (!compareAndSet(state, work))
+            {
+                state = get();
+                continue;
+            }
+            // if we were spinning, exit the state (decrement the count); this is valid even if we are already spinning,
+            // as the assigning thread will have incremented the spinningCount
+            if (state.isSpinning())
+                stopSpinning();
+
+            // if we're being descheduled, place ourselves in the descheduled collection
+            if (work.isStop())
+                pool.descheduled.put(workerId, this);
+
+            // if we're currently stopped, and the new state is not a stop signal
+            // (which we can immediately convert to stopped), unpark the worker
+            if (state.isStopped() && (!work.isStop() || !stop()))
+                LockSupport.unpark(thread);
+            return true;
+        }
+        return false;
+    }
+
+    // try to assign ourselves an executor with work available
+    private boolean selfAssign()
+    {
+        // if we aren't permitted to assign in this state, fail
+        if (!get().canAssign(true))
+            return false;
+        for (SEPExecutor exec : pool.executors)
+        {
+            if (exec.takeWorkPermit(true))
+            {
+                Work work = new Work(exec);
+                // we successfully started work on this executor, so we must either assign it to ourselves or ...
+                if (assign(work, true))
+                    return true;
+                // ... if we fail, schedule it to another worker
+                pool.schedule(work);
+                // and return success as we must have already been assigned a task
+                assert get().assigned != null;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    // we can only call this when our state is WORKING, and no other thread may change our state in this case;
+    // so in this case only we do not need to CAS. We increment the spinningCount and add ourselves to the spinning
+    // collection at the same time
+    private void startSpinning()
+    {
+        assert get() == Work.WORKING;
+        pool.spinningCount.incrementAndGet();
+        set(Work.SPINNING);
+    }
+
+    // exit the spinning state; if there are no remaining spinners, we immediately try and schedule work for all executors
+    // so that any producer is safe to not spin up a worker when they see a spinning thread (invariant (1) above)
+    private void stopSpinning()
+    {
+        if (pool.spinningCount.decrementAndGet() == 0)
+            for (SEPExecutor executor : pool.executors)
+                executor.maybeSchedule();
+        prevStopCheck = soleSpinnerSpinTime = 0;
+    }
+
+    // perform a sleep-spin, incrementing pool.stopCheck accordingly
+    private void doWaitSpin()
+    {
+        // pick a random sleep interval based on the number of threads spinning, so that
+        // we should always have a thread about to wake up, but most threads are sleeping
+        long sleep = 10000 * pool.spinningCount.get();
+        sleep = Math.min(1000000, sleep);
+        sleep *= Math.random();
+        sleep = Math.max(10000, sleep);
+
+        long start = System.nanoTime();
+
+        // place ourselves in the spinning collection; if we clash with another thread just exit
+        Long target = start + sleep;
+        if (pool.spinning.putIfAbsent(target, this) != null)
+            return;
+        LockSupport.parkNanos(sleep);
+
+        // remove ourselves (if haven't been already) - we should be at or near the front, so should be cheap-ish
+        pool.spinning.remove(target, this);
+
+        // finish timing and grab spinningTime (before we finish timing so it is under rather than overestimated)
+        long end = System.nanoTime();
+        long spin = end - start;
+        long stopCheck = pool.stopCheck.addAndGet(spin);
+        maybeStop(stopCheck, end);
+        if (prevStopCheck + spin == stopCheck)
+            soleSpinnerSpinTime += spin;
+        else
+            soleSpinnerSpinTime = 0;
+        prevStopCheck = stopCheck;
+    }
+
+    private static final long stopCheckInterval = TimeUnit.MILLISECONDS.toNanos(10L);
+
+    // stops a worker if elapsed real time is less than elapsed spin time, as this implies the equivalent of
+    // at least one worker achieved nothing in the interval. we achieve this by maintaining a stopCheck which
+    // is initialised to a negative offset from realtime; as we spin we add to this value, and if we ever exceed
+    // realtime we have spun too much and deschedule; if we get too far behind realtime, we reset to our initial offset
+    private void maybeStop(long stopCheck, long now)
+    {
+        long delta = now - stopCheck;
+        if (delta <= 0)
+        {
+            // if stopCheck has caught up with present, we've been spinning too much, so if we can atomically
+            // set it to the past again, we should stop a worker
+            if (pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
+            {
+                // try and stop ourselves;
+                // if we've already been assigned work stop another worker
+                if (!assign(Work.STOP_SIGNALLED, true))
+                    pool.schedule(Work.STOP_SIGNALLED);
+            }
+        }
+        else if (soleSpinnerSpinTime > stopCheckInterval && pool.spinningCount.get() == 1)
+        {
+            // permit self-stopping
+            assign(Work.STOP_SIGNALLED, true);
+        }
+        else
+        {
+            // if stop check has gotten too far behind present, update it so new spins can affect it
+            while (delta > stopCheckInterval * 2 && !pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
+            {
+                stopCheck = pool.stopCheck.get();
+                delta = now - stopCheck;
+            }
+        }
+    }
+
+    private boolean isSpinning()
+    {
+        return get().isSpinning();
+    }
+
+    private boolean stop()
+    {
+        return get().isStop() && compareAndSet(Work.STOP_SIGNALLED, Work.STOPPED);
+    }
+
+    private boolean isStopped()
+    {
+        return get().isStopped();
+    }
+
+    /**
+     * Represents, and communicates changes to, a worker's work state - there are three non-actively-working
+     * states (STOP_SIGNALLED, STOPPED, AND SPINNING) and two working states: WORKING, and (ASSIGNED), the last
+     * being represented by a non-static instance with its "assigned" executor set.
+     *
+     * STOPPED:         indicates the worker is descheduled, and whilst accepts work in this state (causing it to
+     *                  be rescheduled) it will generally not be considered for work until all other worker threads are busy.
+     *                  In this state we should be present in the pool.descheduled collection, and should be parked
+     * -> (ASSIGNED)|SPINNING
+     * STOP_SIGNALLED:  the worker has been asked to deschedule itself, but has not yet done so; only entered from a SPINNING
+     *                  state, and generally communicated to itself, but maybe set from any worker. this state may be preempted
+     *                  and replaced with (ASSIGNED) or SPINNING
+     *                  In this state we should be present in the pool.descheduled collection
+     * -> (ASSIGNED)|STOPPED|SPINNING
+     * SPINNING:        indicates the worker has no work to perform, so is performing a friendly wait-based-spinning
+     *                  until it either is (ASSIGNED) some work (by itself or another thread), or sent STOP_SIGNALLED
+     *                  In this state we _may_ be in the pool.spinning collection (but only if we are in the middle of a sleep)
+     * -> (ASSIGNED)|STOP_SIGNALLED|SPINNING
+     * (ASSIGNED):      asks the worker to perform some work against the specified executor, and preassigns a task permit
+     *                  from that executor so that in this state there is always work to perform.
+     *                  In general a worker assigns itself this state, but sometimes it may assign another worker the state
+     *                  either if there is work outstanding and no-spinning threads, or there is a race to self-assign
+     * -> WORKING
+     * WORKING:         indicates the worker is actively processing an executor's task queue; in this state it accepts
+     *                  no state changes/communications, except from itself; it usually exits this mode into SPINNING,
+     *                  but if work is immediately available on another executor it self-triggers (ASSIGNED)
+     * -> SPINNING|(ASSIGNED)
+     */
+
+    static final class Work
+    {
+        static final Work STOP_SIGNALLED = new Work();
+        static final Work STOPPED = new Work();
+        static final Work SPINNING = new Work();
+        static final Work WORKING = new Work();
+
+        final SEPExecutor assigned;
+
+        Work(SEPExecutor executor)
+        {
+            this.assigned = executor;
+        }
+
+        private Work()
+        {
+            this.assigned = null;
+        }
+
+        boolean canAssign(boolean self)
+        {
+            // we can assign work if there isn't new work already assigned and either
+            // 1) we are assigning to ourselves
+            // 2) the worker we are assigning to is not already in the middle of WORKING
+            return assigned == null && (self || !isWorking());
+        }
+
+        boolean isSpinning()
+        {
+            return this == Work.SPINNING;
+        }
+
+        boolean isWorking()
+        {
+            return this == Work.WORKING;
+        }
+
+        boolean isStop()
+        {
+            return this == Work.STOP_SIGNALLED;
+        }
+
+        boolean isStopped()
+        {
+            return this == Work.STOPPED;
+        }
+
+        boolean isAssigned()
+        {
+            return assigned != null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
new file mode 100644
index 0000000..e03ec57
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.concurrent.SEPWorker.Work;
+
+/**
+ * A pool of worker threads that are shared between all Executors created with it. Each executor is treated as a distinct
+ * unit, with its own concurrency and task queue limits, but the threads that service the tasks on each executor are
+ * free to hop between executors at will.
+ *
+ * To keep producers from incurring unnecessary delays, once an executor is "spun up" (i.e. is processing tasks at a steady
+ * rate), adding tasks to the executor often involves only placing the task on the work queue and updating the
+ * task permits (which imposes our max queue length constraints). Only when it cannot be guaranteed the task will be serviced
+ * promptly does the producer have to signal a thread itself to perform the work.
+ *
+ * We do this by scheduling only if
+ *
+ * The worker threads schedule themselves as far as possible: when they are assigned a task, they will attempt to spawn
+ * a partner worker to service any other work outstanding on the queue (if any); once they have finished the task they
+ * will either take another (if any remaining) and repeat this, or they will attempt to assign themselves to another executor
+ * that does have tasks remaining. If both fail, it will enter a non-busy-spinning phase, where it will sleep for a short
+ * random interval (based upon the number of threads in this mode, so that the total amount of non-sleeping time remains
+ * approximately fixed regardless of the number of spinning threads), and upon waking up will again try to assign themselves
+ * an executor with outstanding tasks to perform.
+ */
+public class SharedExecutorPool
+{
+
+    // the name assigned to workers in the pool, and the id suffix
+    final String poolName;
+    final AtomicLong workerId = new AtomicLong();
+
+    // the collection of executors serviced by this pool; periodically ordered by traffic volume
+    final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+
+    // the number of workers currently in a spinning state
+    final AtomicInteger spinningCount = new AtomicInteger();
+    // see SEPWorker.maybeStop() - used to self coordinate stopping of threads
+    final AtomicLong stopCheck = new AtomicLong();
+    // the collection of threads that are (most likely) in a spinning state - new workers are scheduled from here first
+    // TODO: consider using a queue partially-ordered by scheduled wake-up time
+    // (a full-fledged correctly ordered SkipList is overkill)
+    final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>();
+    // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
+    final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
+
+    public SharedExecutorPool(String poolName)
+    {
+        this.poolName = poolName;
+    }
+
+    void schedule(Work work)
+    {
+        // we try to hand-off our work to the spinning queue before the descheduled queue, even though we expect it to be empty
+        // all we're doing here is hoping to find a worker without work to do, but it doesn't matter too much what we find;
+        // we atomically set the task so even if this were a collection of all workers it would be safe, and if they are both
+        // empty we schedule a new thread
+        Map.Entry<Long, SEPWorker> e;
+        while (null != (e = spinning.pollFirstEntry()) || null != (e = descheduled.pollFirstEntry()))
+            if (e.getValue().assign(work, false))
+                return;
+
+        if (!work.isStop())
+            new SEPWorker(workerId.incrementAndGet(), work, this);
+    }
+
+    void maybeStartSpinningWorker()
+    {
+        // in general the workers manage spinningCount directly; however if it is zero, we increment it atomically
+        // ourselves to avoid starting a worker unless we have to
+        int current = spinningCount.get();
+        if (current == 0 && spinningCount.compareAndSet(0, 1))
+            schedule(Work.SPINNING);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 512d64a..303f658 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -45,10 +45,10 @@ public class StageManager
 
     static
     {
-        stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
-        stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
-        stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
-        stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
+        stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
+        stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
+        stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
+        stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
         stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
         // the rest are all single-threaded
         stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
@@ -87,14 +87,9 @@ public class StageManager
                                                 stage.getJmxType());
     }
 
-    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+    private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
     {
-        return new JMXConfigurableThreadPoolExecutor(numThreads,
-                                                     KEEPALIVE,
-                                                     TimeUnit.SECONDS,
-                                                     new LinkedBlockingQueue<Runnable>(),
-                                                     new NamedThreadFactory(stage.getJmxName()),
-                                                     stage.getJmxType());
+        return JMXEnabledSharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxName(), stage.getJmxType());
     }
 
     /**
@@ -134,6 +129,11 @@ public class StageManager
             super.execute(command);
         }
 
+        public void maybeExecuteImmediately(Runnable command)
+        {
+            execute(command);
+        }
+
         @Override
         public Future<?> submit(Runnable task)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
index e5dcd7e..f580fea 100644
--- a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -30,4 +30,7 @@ public interface TracingAwareExecutorService extends ExecutorService
     // we need a way to inject a TraceState directly into the Executor context without going through
     // the global Tracing sessions; see CASSANDRA-5668
     public void execute(Runnable command, TraceState state);
+
+    // permits executing in the context of the submitting thread
+    public void maybeExecuteImmediately(Runnable command);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/SEPMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/SEPMetrics.java b/src/java/org/apache/cassandra/metrics/SEPMetrics.java
new file mode 100644
index 0000000..fbccc3b
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/SEPMetrics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import org.apache.cassandra.concurrent.SEPExecutor;
+
+public class SEPMetrics
+{
+    /** Number of active tasks. */
+    public final Gauge<Integer> activeTasks;
+    /** Number of tasks that had blocked before being accepted (or rejected). */
+    public final Gauge<Integer> totalBlocked;
+    /**
+     * Number of tasks currently blocked, waiting to be accepted by
+     * the executor (because all threads are busy and the backing queue is full).
+     */
+    public final Gauge<Long> currentBlocked;
+    /** Number of completed tasks. */
+    public final Gauge<Long> completedTasks;
+
+    /** Number of tasks waiting to be executed. */
+    public final Gauge<Long> pendingTasks;
+
+    private MetricNameFactory factory;
+
+    /**
+     * Create metrics for the given LowSignalExecutor.
+     *
+     * @param executor Thread pool
+     * @param path Type of thread pool
+     * @param poolName Name of thread pool to identify metrics
+     */
+    public SEPMetrics(final SEPExecutor executor, String path, String poolName)
+    {
+        this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
+        activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return executor.getActiveCount();
+            }
+        });
+        pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getPendingTasks();
+            }
+        });
+        totalBlocked = Metrics.newGauge(factory.createMetricName("TotalBlockedTasks"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return executor.getTotalBlockedTasks();
+            }
+        });
+        currentBlocked = Metrics.newGauge(factory.createMetricName("CurrentlyBlockedTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return (long) executor.getCurrentlyBlockedTasks();
+            }
+        });
+        completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getCompletedTasks();
+            }
+        });
+    }
+
+    public void release()
+    {
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("ActiveTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompletedTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
new file mode 100644
index 0000000..4afc4d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.core.MetricName;
+
+class ThreadPoolMetricNameFactory implements MetricNameFactory
+{
+    private final String type;
+    private final String path;
+    private final String poolName;
+
+    ThreadPoolMetricNameFactory(String type, String path, String poolName)
+    {
+        this.type = type;
+        this.path = path;
+        this.poolName = poolName;
+    }
+
+    public MetricName createMetricName(String metricName)
+    {
+        String groupName = ThreadPoolMetrics.class.getPackage().getName();
+        StringBuilder mbeanName = new StringBuilder();
+        mbeanName.append(groupName).append(":");
+        mbeanName.append("type=").append(type);
+        mbeanName.append(",path=").append(path);
+        mbeanName.append(",scope=").append(poolName);
+        mbeanName.append(",name=").append(metricName);
+
+        return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
index af54cdb..3cebf07 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
@@ -52,7 +52,7 @@ public class ThreadPoolMetrics
      */
     public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName)
     {
-        this.factory = new ThreadPoolMetricNameFactory(path, poolName);
+        this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
 
         activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
         {
@@ -87,30 +87,4 @@ public class ThreadPoolMetrics
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
     }
-
-    class ThreadPoolMetricNameFactory implements MetricNameFactory
-    {
-        private final String path;
-        private final String poolName;
-
-        ThreadPoolMetricNameFactory(String path, String poolName)
-        {
-            this.path = path;
-            this.poolName = poolName;
-        }
-
-        public MetricName createMetricName(String metricName)
-        {
-            String groupName = ThreadPoolMetrics.class.getPackage().getName();
-            String type = "ThreadPools";
-            StringBuilder mbeanName = new StringBuilder();
-            mbeanName.append(groupName).append(":");
-            mbeanName.append("type=").append(type);
-            mbeanName.append(",path=").append(path);
-            mbeanName.append(",scope=").append(poolName);
-            mbeanName.append(",name=").append(metricName);
-
-            return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e5db1d7..3e88b37 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.service.paxos.PrepareResponse;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 2e1adcb..af00403 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 /**
  * RepairJob runs repair on given ColumnFamily.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 1b6375e..507dafa 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 /**
  * Coordinates the (active) repair of a token range.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3f57e73..2c3261f 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,12 +77,12 @@ public abstract class AbstractReadExecutor
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)
     {
+        boolean readLocal = false;
         for (InetAddress endpoint : endpoints)
         {
             if (isLocalRequest(endpoint))
             {
-                logger.trace("reading data locally");
-                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+                readLocal = true;
             }
             else
             {
@@ -90,6 +90,11 @@ public abstract class AbstractReadExecutor
                 MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
             }
         }
+        if (readLocal)
+        {
+            logger.trace("reading data locally");
+            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
+        }
     }
 
     protected void makeDigestRequests(Iterable<InetAddress> endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index c75aac2..ece9289 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 64ef443..29eaadf 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index d01dd99..2cbc475 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -780,6 +780,7 @@ public class StorageProxy implements StorageProxyMBean
         // only need to create a Message for non-local writes
         MessageOut<Mutation> message = null;
 
+        boolean insertLocal = false;
         for (InetAddress destination : targets)
         {
             // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
@@ -797,7 +798,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
-                    insertLocal(mutation, responseHandler);
+                    insertLocal = true;
                 }
                 else
                 {
@@ -835,6 +836,9 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        if (insertLocal)
+            insertLocal(mutation, responseHandler);
+
         if (dcGroups != null)
         {
             // for each datacenter, send the message to one node to relay the write to other replicas
@@ -944,7 +948,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
     {
-        StageManager.getStage(Stage.MUTATION).execute(new LocalMutationRunnable()
+        StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
         {
             public void runMayThrow()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 3bacad8..cce8ecc 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public class TruncateResponseHandler implements IAsyncCallback
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8e3840c..a60ab84 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -59,7 +59,7 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamManagerMBean;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 /**
  * JMX client operations for Cassandra.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0ad4312..9e8719e 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -335,7 +335,7 @@ public abstract class Message
             }
             void start()
             {
-                if (running.compareAndSet(false, true))
+                if (!running.get() && running.compareAndSet(false, true))
                 {
                     this.eventLoop.execute(this);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
index ee7d127..9cac645 100644
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -18,29 +18,24 @@
 package org.apache.cassandra.transport;
 
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.util.concurrent.AbstractEventExecutor;
 import io.netty.util.concurrent.EventExecutorGroup;
 import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import static org.apache.cassandra.concurrent.JMXEnabledSharedExecutorPool.SHARED;
+
 public class RequestThreadPoolExecutor extends AbstractEventExecutor
 {
-
-    private final static int CORE_THREAD_TIMEOUT_SEC = 30;
-    // Number of request we accept to queue before blocking. We could allow this to be configured...
     private final static int MAX_QUEUED_REQUESTS = 128;
-
     private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
-    private final JMXEnabledThreadPoolExecutor wrapped = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
-                                                                                          CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS,
-                                                                                          new LinkedBlockingQueue<Runnable>(MAX_QUEUED_REQUESTS),
-                                                                                          new NamedThreadFactory(THREAD_FACTORY_ID),
-                                                                                          "transport");
+    private final TracingAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+                                                                           MAX_QUEUED_REQUESTS,
+                                                                           THREAD_FACTORY_ID,
+                                                                           "transport");
 
     public boolean isShuttingDown()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java
deleted file mode 100644
index 4d5f896..0000000
--- a/src/java/org/apache/cassandra/utils/SimpleCondition.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-
-// fulfils the Condition interface without spurious wakeup problems
-// (or lost notify problems either: that is, even if you call await()
-// _after_ signal(), it will work as desired.)
-public class SimpleCondition implements Condition
-{
-    private boolean set;
-
-    public synchronized void await() throws InterruptedException
-    {
-        while (!set)
-            wait();
-    }
-
-    public synchronized void reset()
-    {
-        set = false;
-    }
-
-    public synchronized boolean await(long time, TimeUnit unit) throws InterruptedException
-    {
-        long start = System.nanoTime();
-        long timeout = unit.toNanos(time);
-        long elapsed;
-        while (!set && (elapsed = System.nanoTime() - start) < timeout)
-        {
-            TimeUnit.NANOSECONDS.timedWait(this, timeout - elapsed);
-        }
-        return set;
-    }
-
-    public void signal()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public synchronized void signalAll()
-    {
-        set = true;
-        notifyAll();
-    }
-
-    public synchronized boolean isSignaled()
-    {
-        return set;
-    }
-
-    public void awaitUninterruptibly()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public long awaitNanos(long nanosTimeout) throws InterruptedException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean awaitUntil(Date deadline) throws InterruptedException
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
new file mode 100644
index 0000000..57614e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Condition;
+
+// fulfils the Condition interface without spurious wakeup problems
+// (or lost notify problems either: that is, even if you call await()
+// _after_ signal(), it will work as desired.)
+public class SimpleCondition implements Condition
+{
+    private static final AtomicReferenceFieldUpdater<SimpleCondition, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(SimpleCondition.class, WaitQueue.class, "waiting");
+
+    private volatile WaitQueue waiting;
+    private volatile boolean signaled = false;
+
+    public void await() throws InterruptedException
+    {
+        if (isSignaled())
+            return;
+        if (waiting == null)
+            waitingUpdater.compareAndSet(this, null, new WaitQueue());
+        WaitQueue.Signal s = waiting.register();
+        if (isSignaled())
+            s.cancel();
+        else
+            s.await();
+        assert isSignaled();
+    }
+
+    public boolean await(long time, TimeUnit unit) throws InterruptedException
+    {
+        if (isSignaled())
+            return true;
+        long start = System.nanoTime();
+        long until = start + unit.toNanos(time);
+        if (waiting == null)
+            waitingUpdater.compareAndSet(this, null, new WaitQueue());
+        WaitQueue.Signal s = waiting.register();
+        if (isSignaled())
+        {
+            s.cancel();
+            return true;
+        }
+        return s.awaitUntil(until) || isSignaled();
+    }
+
+    public void signal()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isSignaled()
+    {
+        return signaled;
+    }
+
+    public void signalAll()
+    {
+        signaled = true;
+        if (waiting != null)
+            waiting.signalAll();
+    }
+
+    public void awaitUninterruptibly()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public long awaitNanos(long nanosTimeout) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean awaitUntil(Date deadline) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
index 8d072ea..2322210 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
 
@@ -73,8 +74,6 @@ import java.util.concurrent.locks.LockSupport;
 public final class WaitQueue
 {
 
-    private static final Logger logger = LoggerFactory.getLogger(WaitQueue.class);
-
     private static final int CANCELLED = -1;
     private static final int SIGNALLED = 1;
     private static final int NOT_SET = 0;
@@ -82,11 +81,11 @@ public final class WaitQueue
     private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state");
 
     // the waiting signals
-    private final ConcurrentLinkedDeque<RegisteredSignal> queue = new ConcurrentLinkedDeque<>();
+    private final ConcurrentLinkedQueue<RegisteredSignal> queue = new ConcurrentLinkedQueue<>();
 
     /**
      * The calling thread MUST be the thread that uses the signal
-     * @return
+     * @return                                x
      */
     public Signal register()
     {
@@ -119,7 +118,7 @@ public final class WaitQueue
         while (true)
         {
             RegisteredSignal s = queue.poll();
-            if (s == null || s.signal())
+            if (s == null || s.signal() != null)
                 return s != null;
         }
     }
@@ -129,42 +128,40 @@ public final class WaitQueue
      */
     public void signalAll()
     {
-        RegisteredSignal last = queue.peekLast();
-        if (last == null)
+        if (!hasWaiters())
             return;
-        List<Thread> woke = null;
-        if (logger.isTraceEnabled())
-            woke = new ArrayList<>();
-        long start = System.nanoTime();
-        // we wake up only a snapshot of the queue, to avoid a race where the condition is not met and the woken thread
-        // immediately waits on the queue again
+
+        // to avoid a race where the condition is not met and the woken thread managed to wait on the queue before
+        // we finish signalling it all, we pick a random thread we have woken-up and hold onto it, so that if we encounter
+        // it again we know we're looping. We reselect a random thread periodically, progressively less often.
+        // the "correct" solution to this problem is to use a queue that permits snapshot iteration, but this solution is sufficient
+        int i = 0, s = 5;
+        Thread randomThread = null;
         Iterator<RegisteredSignal> iter = queue.iterator();
         while (iter.hasNext())
         {
             RegisteredSignal signal = iter.next();
-            if (logger.isTraceEnabled())
+            Thread signalled = signal.signal();
+
+            if (signalled != null)
             {
-                Thread thread = signal.thread;
-                if (signal.signal())
-                    woke.add(thread);
+                if (signalled == randomThread)
+                    break;
+
+                if (++i == s)
+                {
+                    randomThread = signalled;
+                    s <<= 1;
+                }
             }
-            else
-                signal.signal();
 
             iter.remove();
-
-            if (signal == last)
-                break;
         }
-        long end = System.nanoTime();
-        if (woke != null)
-            logger.trace("Woke up {} in {}ms from {}", woke, (end - start) * 0.000001d, Thread.currentThread().getStackTrace()[2]);
     }
 
     private void cleanUpCancelled()
     {
-        // attempt to remove the cancelled from the beginning only, but if we fail to remove any proceed to cover
-        // the whole list
+        // TODO: attempt to remove the cancelled from the beginning only (need atomic cas of head)
         Iterator<RegisteredSignal> iter = queue.iterator();
         while (iter.hasNext())
         {
@@ -185,7 +182,7 @@ public final class WaitQueue
      */
     public int getWaiting()
     {
-        if (queue.isEmpty())
+        if (!hasWaiters())
             return 0;
         Iterator<RegisteredSignal> iter = queue.iterator();
         int count = 0;
@@ -264,11 +261,11 @@ public final class WaitQueue
          * isSignalled() will be true on exit, and the method will return true; if timedout, the method will return
          * false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled()
          * will be true.
-         * @param until System.currentTimeMillis() to wait until
+         * @param nanos System.nanoTime() to wait until
          * @return true if signalled, false if timed out
          * @throws InterruptedException
          */
-        public boolean awaitUntil(long until) throws InterruptedException;
+        public boolean awaitUntil(long nanos) throws InterruptedException;
     }
 
     /**
@@ -302,10 +299,12 @@ public final class WaitQueue
 
         public boolean awaitUntil(long until) throws InterruptedException
         {
-            while (until < System.currentTimeMillis() && !isSignalled())
+            long now;
+            while (until > (now = System.nanoTime()) && !isSignalled())
             {
                 checkInterrupted();
-                LockSupport.parkUntil(until);
+                long delta = until - now;
+                LockSupport.parkNanos(delta);
             }
             return checkAndClear();
         }
@@ -343,15 +342,16 @@ public final class WaitQueue
             return state != NOT_SET;
         }
 
-        private boolean signal()
+        private Thread signal()
         {
             if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED))
             {
+                Thread thread = this.thread;
                 LockSupport.unpark(thread);
-                thread = null;
-                return true;
+                this.thread = null;
+                return thread;
             }
-            return false;
+            return null;
         }
 
         public boolean checkAndClear()