You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/05/29 23:32:14 UTC
[2/3] git commit: More efficient executor service for fast operations
More efficient executor service for fast operations
Patch by Benedict Elliott Smith; reviewed by Jason Brown for CASSANDRA 4718
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5420b7a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5420b7a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5420b7a2
Branch: refs/heads/trunk
Commit: 5420b7a2296d230e7fd5bc2f41fc6472a9c8b55e
Parents: a8d18c9
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Thu May 29 22:16:03 2014 +0100
Committer: belliottsmith <gi...@sub.laerad.com>
Committed: Thu May 29 22:20:21 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../AbstractTracingAwareExecutorService.java | 227 ++++++++++++
.../DebuggableThreadPoolExecutor.java | 5 +
.../JMXEnabledSharedExecutorPool.java | 114 ++++++
.../cassandra/concurrent/SEPExecutor.java | 274 ++++++++++++++
.../apache/cassandra/concurrent/SEPWorker.java | 368 +++++++++++++++++++
.../concurrent/SharedExecutorPool.java | 99 +++++
.../cassandra/concurrent/StageManager.java | 22 +-
.../concurrent/TracingAwareExecutorService.java | 3 +
.../apache/cassandra/metrics/SEPMetrics.java | 98 +++++
.../metrics/ThreadPoolMetricNameFactory.java | 47 +++
.../cassandra/metrics/ThreadPoolMetrics.java | 28 +-
.../apache/cassandra/net/MessagingService.java | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 2 +-
.../apache/cassandra/repair/RepairSession.java | 1 +
.../cassandra/service/AbstractReadExecutor.java | 9 +-
.../service/AbstractWriteResponseHandler.java | 2 +-
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 8 +-
.../service/TruncateResponseHandler.java | 2 +-
.../org/apache/cassandra/tools/NodeProbe.java | 2 +-
.../org/apache/cassandra/transport/Message.java | 2 +-
.../transport/RequestThreadPoolExecutor.java | 19 +-
.../apache/cassandra/utils/SimpleCondition.java | 84 -----
.../utils/concurrent/SimpleCondition.java | 97 +++++
.../cassandra/utils/concurrent/WaitQueue.java | 72 ++--
.../concurrent/LongSharedExecutorPoolTest.java | 228 ++++++++++++
.../apache/cassandra/repair/ValidatorTest.java | 2 +-
28 files changed, 1638 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f313278..3b09b7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc1
+ * More efficient executor service for fast operations (CASSANDRA-4718)
* Move less common tools into a new cassandra-tools package (CASSANDRA-7160)
* Support more concurrent requests in native protocol (CASSANDRA-7231)
* Add tab-completion to debian nodetool packaging (CASSANDRA-6421)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
new file mode 100644
index 0000000..544e8a7
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.tracing.Tracing.isTracing;
+
+public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class);
+
+ protected abstract void addTask(FutureTask<?> futureTask);
+ protected abstract void onCompletion();
+
+ /** Task Submission / Creation / Objects **/
+
+ public <T> FutureTask<T> submit(Callable<T> task)
+ {
+ return submit(newTaskFor(task));
+ }
+
+ public FutureTask<?> submit(Runnable task)
+ {
+ return submit(newTaskFor(task, null));
+ }
+
+ public <T> FutureTask<T> submit(Runnable task, T result)
+ {
+ return submit(newTaskFor(task, result));
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+ {
+ return newTaskFor(runnable, result, null);
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState)
+ {
+ if (traceState != null || isTracing())
+ {
+ if (runnable instanceof TraceSessionFutureTask)
+ return (TraceSessionFutureTask<T>) runnable;
+ return new TraceSessionFutureTask<T>(runnable, result, traceState);
+ }
+ if (runnable instanceof FutureTask)
+ return (FutureTask<T>) runnable;
+ return new FutureTask<>(runnable, result);
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+ {
+ if (isTracing())
+ {
+ if (callable instanceof TraceSessionFutureTask)
+ return (TraceSessionFutureTask<T>) callable;
+ return new TraceSessionFutureTask<T>(callable, null);
+ }
+ if (callable instanceof java.util.concurrent.FutureTask)
+ return (FutureTask<T>) callable;
+ return new FutureTask<>(callable);
+ }
+
+ private class TraceSessionFutureTask<T> extends FutureTask<T>
+ {
+ private final TraceState state;
+
+ public TraceSessionFutureTask(Callable<T> callable, TraceState state)
+ {
+ super(callable);
+ this.state = state;
+ }
+
+ public TraceSessionFutureTask(Runnable runnable, T result, TraceState state)
+ {
+ super(runnable, result);
+ this.state = state;
+ }
+
+ public void run()
+ {
+ Tracing.instance.set(state);
+ try
+ {
+ super.run();
+ }
+ finally
+ {
+ Tracing.instance.set(null);
+ }
+ }
+ }
+
+ class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+ {
+ private boolean failure;
+ private Object result = this;
+ private final Callable<T> callable;
+
+ public FutureTask(Callable<T> callable)
+ {
+ this.callable = callable;
+ }
+ public FutureTask(Runnable runnable, T result)
+ {
+ this(Executors.callable(runnable, result));
+ }
+
+ public void run()
+ {
+ try
+ {
+ result = callable.call();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+ result = t;
+ failure = true;
+ }
+ finally
+ {
+ signalAll();
+ onCompletion();
+ }
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return isSignaled();
+ }
+
+ public T get() throws InterruptedException, ExecutionException
+ {
+ await();
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ await(timeout, unit);
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+ }
+
+ private <T> FutureTask<T> submit(FutureTask<T> task)
+ {
+ addTask(task);
+ return task;
+ }
+
+ public void execute(Runnable command)
+ {
+ addTask(newTaskFor(command, null));
+ }
+
+ public void execute(Runnable command, TraceState state)
+ {
+ addTask(newTaskFor(command, null, state));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 665f0b0..cd9e9e4 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -138,6 +138,11 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
: new TraceSessionWrapper<Object>(command, state));
}
+ public void maybeExecuteImmediately(Runnable command)
+ {
+ execute(command);
+ }
+
// execute does not call newTaskFor
@Override
public void execute(Runnable command)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
new file mode 100644
index 0000000..d70e524
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java
@@ -0,0 +1,114 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.metrics.SEPMetrics;
+
+public class JMXEnabledSharedExecutorPool extends SharedExecutorPool
+{
+
+ public static final JMXEnabledSharedExecutorPool SHARED = new JMXEnabledSharedExecutorPool("SharedPool");
+
+ public JMXEnabledSharedExecutorPool(String poolName)
+ {
+ super(poolName);
+ }
+
+ public interface JMXEnabledSEPExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
+ {
+ }
+
+ public class JMXEnabledSEPExecutor extends SEPExecutor implements JMXEnabledSEPExecutorMBean
+ {
+
+ private final SEPMetrics metrics;
+ private final String mbeanName;
+
+ public JMXEnabledSEPExecutor(int poolSize, int maxQueuedLength, String name, String jmxPath)
+ {
+ super(JMXEnabledSharedExecutorPool.this, poolSize, maxQueuedLength);
+ metrics = new SEPMetrics(this, jmxPath, name);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + name;
+
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void unregisterMBean()
+ {
+ try
+ {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // release metrics
+ metrics.release();
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ // synchronized, because there is no way to access super.mainLock, which would be
+ // the preferred way to make this threadsafe
+ if (!isShutdown())
+ {
+ unregisterMBean();
+ }
+ super.shutdown();
+ }
+
+ public int getCoreThreads()
+ {
+ return 0;
+ }
+
+ public void setCoreThreads(int number)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setMaximumThreads(int number)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String name, String jmxPath)
+ {
+ JMXEnabledSEPExecutor executor = new JMXEnabledSEPExecutor(maxConcurrency, maxQueuedTasks, name, jmxPath);
+ executors.add(executor);
+ return executor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
new file mode 100644
index 0000000..bdc0045
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -0,0 +1,274 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.concurrent.SEPWorker.Work;
+
+public class SEPExecutor extends AbstractTracingAwareExecutorService
+{
+ private final SharedExecutorPool pool;
+
+ private final int maxWorkers;
+ private final int maxTasksQueued;
+
+ // stores both a set of work permits and task permits:
+ // bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued] (initially 0)
+ // top 32 bits are number of work permits available in the range [0..maxWorkers] (initially maxWorkers)
+ private final AtomicLong permits = new AtomicLong();
+
+ // producers wait on this when there is no room on the queue
+ private final WaitQueue hasRoom = new WaitQueue();
+ private final AtomicLong totalBlocked = new AtomicLong();
+ private final AtomicInteger currentlyBlocked = new AtomicInteger();
+ private final AtomicLong completedTasks = new AtomicLong();
+
+ volatile boolean shuttingDown = false;
+ final SimpleCondition shutdown = new SimpleCondition();
+
+ // TODO: see if other queue implementations might improve throughput
+ protected final ConcurrentLinkedQueue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();
+
+ SEPExecutor(SharedExecutorPool pool, int maxWorkers, int maxTasksQueued)
+ {
+ this.pool = pool;
+ this.maxWorkers = maxWorkers;
+ this.maxTasksQueued = maxTasksQueued;
+ this.permits.set(combine(0, maxWorkers));
+ }
+
+ protected void onCompletion()
+ {
+ completedTasks.incrementAndGet();
+ }
+
+ // schedules another worker for this pool if there is work outstanding and there are no spinning threads that
+ // will self-assign to it in the immediate future
+ boolean maybeSchedule()
+ {
+ if (pool.spinningCount.get() > 0 || !takeWorkPermit(true))
+ return false;
+
+ pool.schedule(new Work(this));
+ return true;
+ }
+
+ protected void addTask(FutureTask<?> task)
+ {
+ // we add to the queue first, so that when a worker takes a task permit it can be certain there is a task available
+ // this permits us to schedule threads non-spuriously; it also means work is serviced fairly
+ tasks.add(task);
+ int taskPermits;
+ while (true)
+ {
+ long current = permits.get();
+ taskPermits = taskPermits(current);
+ // because there is no difference in practical terms between the work permit being added or not (the work is already in existence)
+ // we always add our permit, but block after the fact if we breached the queue limit
+ if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits + 1)))
+ break;
+ }
+
+ if (taskPermits == 0)
+ {
+ // we only need to schedule a thread if there are no tasks already waiting to be processed, as
+ // the original enqueue will have started a thread to service its work which will have itself
+ // spawned helper workers that would have either exhausted the available tasks or are still being spawned.
+ // to avoid incurring any unnecessary signalling penalties we also do not take any work to hand to the new
+ // worker, we simply start a worker in a spinning state
+ pool.maybeStartSpinningWorker();
+ }
+ else if (taskPermits >= maxTasksQueued)
+ {
+ // register to receive a signal once a task is processed bringing the queue below its threshold
+ WaitQueue.Signal s = hasRoom.register();
+
+ // we will only be signalled once the queue drops below full, so this creates equivalent external behaviour
+ // however the advantage is that we never wake-up spuriously;
+ // we choose to always sleep, even if in the intervening time the queue has dropped below limit,
+ // so long as we _will_ eventually receive a signal
+ if (taskPermits(permits.get()) > maxTasksQueued)
+ {
+ // if we're blocking, we might as well directly schedule a worker if we aren't already at max
+ if (takeWorkPermit(true))
+ pool.schedule(new Work(this));
+ totalBlocked.incrementAndGet();
+ currentlyBlocked.incrementAndGet();
+ s.awaitUninterruptibly();
+ currentlyBlocked.decrementAndGet();
+ }
+ else // don't propagate our signal when we cancel, just cancel
+ s.cancel();
+ }
+ }
+
+ // takes permission to perform a task, if any are available; once taken it is guaranteed
+ // that a proceeding call to tasks.poll() will return some work
+ boolean takeTaskPermit()
+ {
+ while (true)
+ {
+ long current = permits.get();
+ int taskPermits = taskPermits(current);
+ if (taskPermits == 0)
+ return false;
+ if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits - 1)))
+ {
+ if (taskPermits == maxTasksQueued && hasRoom.hasWaiters())
+ hasRoom.signalAll();
+ return true;
+ }
+ }
+ }
+
+ // takes a worker permit and (optionally) a task permit simultaneously; if one of the two is unavailable, returns false
+ boolean takeWorkPermit(boolean takeTaskPermit)
+ {
+ int taskDelta = takeTaskPermit ? 1 : 0;
+ while (true)
+ {
+ long current = permits.get();
+ int workPermits = workPermits(current);
+ int taskPermits = taskPermits(current);
+ if (workPermits == 0 || taskPermits == 0)
+ return false;
+ if (permits.compareAndSet(current, combine(taskPermits - taskDelta, workPermits - 1)))
+ {
+ if (takeTaskPermit && taskPermits == maxTasksQueued && hasRoom.hasWaiters())
+ hasRoom.signalAll();
+ return true;
+ }
+ }
+ }
+
+ // gives up a work permit
+ void returnWorkPermit()
+ {
+ while (true)
+ {
+ long current = permits.get();
+ int workPermits = workPermits(current);
+ if (permits.compareAndSet(current, updateWorkPermits(current, workPermits + 1)))
+ return;
+ }
+ }
+
+ public void maybeExecuteImmediately(Runnable command)
+ {
+ FutureTask<?> ft = newTaskFor(command, null);
+ if (!takeWorkPermit(false))
+ {
+ addTask(ft);
+ }
+ else
+ {
+ try
+ {
+ ft.run();
+ }
+ finally
+ {
+ returnWorkPermit();
+ // we have to maintain our invariant of always scheduling after any work is performed
+ // in this case in particular we are not processing the rest of the queue anyway, and so
+ // the work permit may go wasted if we don't immediately attempt to spawn another worker
+ maybeSchedule();
+ }
+ }
+ }
+
+ public synchronized void shutdown()
+ {
+ shuttingDown = true;
+ pool.executors.remove(this);
+ if (getActiveCount() == 0)
+ shutdown.signalAll();
+ }
+
+ public synchronized List<Runnable> shutdownNow()
+ {
+ shutdown();
+ List<Runnable> aborted = new ArrayList<>();
+ while (takeTaskPermit())
+ aborted.add(tasks.poll());
+ return aborted;
+ }
+
+ public boolean isShutdown()
+ {
+ return shuttingDown;
+ }
+
+ public boolean isTerminated()
+ {
+ return shuttingDown && shutdown.isSignaled();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ shutdown.await(timeout, unit);
+ return isTerminated();
+ }
+
+ public long getPendingTasks()
+ {
+ return taskPermits(permits.get());
+ }
+
+ public long getCompletedTasks()
+ {
+ return completedTasks.get();
+ }
+
+ public int getActiveCount()
+ {
+ return maxWorkers - workPermits(permits.get());
+ }
+
+ public int getTotalBlockedTasks()
+ {
+ return (int) totalBlocked.get();
+ }
+
+ public int getMaximumThreads()
+ {
+ return maxWorkers;
+ }
+
+ public int getCurrentlyBlockedTasks()
+ {
+ return currentlyBlocked.get();
+ }
+
+ private static int taskPermits(long both)
+ {
+ return (int) both;
+ }
+
+ private static int workPermits(long both)
+ {
+ return (int) (both >>> 32);
+ }
+
+ private static long updateTaskPermits(long prev, int taskPermits)
+ {
+ return (prev & (-1L << 32)) | taskPermits;
+ }
+
+ private static long updateWorkPermits(long prev, int workPermits)
+ {
+ return (((long) workPermits) << 32) | (prev & (-1L >>> 32));
+ }
+
+ private static long combine(int taskPermits, int workPermits)
+ {
+ return (((long) workPermits) << 32) | taskPermits;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
new file mode 100644
index 0000000..084bba7
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -0,0 +1,368 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class);
+
+ final Long workerId;
+ final Thread thread;
+ final SharedExecutorPool pool;
+
+ // prevStopCheck stores the value of pool.stopCheck after we last incremented it; if it hasn't changed,
+ // we know nobody else was spinning in the interval, so we increment our soleSpinnerSpinTime accordingly,
+ // and otherwise we set it to zero; this is then used to terminate the final spinning thread, as the coordinated
+ // strategy can only work when there are multiple threads spinning (as more sleep time must elapse than real time)
+ long prevStopCheck = 0;
+ long soleSpinnerSpinTime = 0;
+
+ SEPWorker(Long workerId, Work initialState, SharedExecutorPool pool)
+ {
+ this.pool = pool;
+ this.workerId = workerId;
+ thread = new Thread(this, pool.poolName + "-Worker-" + workerId);
+ thread.setDaemon(true);
+ set(initialState);
+ thread.start();
+ }
+
+ public void run()
+ {
+ /**
+ * we maintain two important invariants:
+ * 1) after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed
+ * promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to
+ * avoid signalling if there are _any_ spinning threads. we achieve this by simply calling maybeSchedule()
+ * on each queue if on decrementing the spin counter we hit zero.
+ * 2) before processing a task on a given queue, we attempt to assign another worker to the _same queue only_;
+ * this allows a producer to skip signalling work if the task queue is currently non-empty, and in conjunction
+ * with invariant (1) ensures that if any thread was spinning when a task was added to any executor, that
+ * task will be processed immediately if work permits are available
+ */
+
+ SEPExecutor assigned = null;
+ Runnable task = null;
+ try
+ {
+ while (true)
+ {
+ if (isSpinning() && !selfAssign())
+ {
+ doWaitSpin();
+ continue;
+ }
+
+ // if stop was signalled, go to sleep (don't try self-assign; being put to sleep is rare, so let's obey it
+ // whenever we receive it - though we don't apply this constraint to producers, who may reschedule us before
+ // we go to sleep)
+ if (stop())
+ while (isStopped())
+ LockSupport.park();
+
+ // we can be assigned any state from STOPPED, so loop if we don't actually have any tasks assigned
+ assigned = get().assigned;
+ if (assigned == null)
+ continue;
+ task = assigned.tasks.poll();
+
+ // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
+ // (which is also a state that will never be interrupted externally)
+ set(Work.WORKING);
+ boolean shutdown;
+ while (true)
+ {
+ // before we process any task, we maybe schedule a new worker _to our executor only_; this
+ // ensures that even once all spinning threads have found work, if more work is left to be serviced
+ // and permits are available, it will be dealt with immediately.
+ assigned.maybeSchedule();
+
+ // we know there is work waiting, as we have a work permit, so poll() will always succeed
+ task.run();
+ task = null;
+
+ // if we're shutting down, or we fail to take a permit, we don't perform any more work
+ if ((shutdown = assigned.shuttingDown) || !assigned.takeTaskPermit())
+ break;
+ task = assigned.tasks.poll();
+ }
+
+ // return our work permit, and maybe signal shutdown
+ assigned.returnWorkPermit();
+ if (shutdown && assigned.getActiveCount() == 0)
+ assigned.shutdown.signalAll();
+ assigned = null;
+
+ // try to immediately reassign ourselves some work; if we fail, start spinning
+ if (!selfAssign())
+ startSpinning();
+ }
+ }
+ catch (Throwable t)
+ {
+ while (true)
+ {
+ if (get().assigned != null)
+ {
+ assigned = get().assigned;
+ set(Work.WORKING);
+ }
+ if (assign(Work.STOPPED, true))
+ break;
+ }
+ if (assigned != null)
+ assigned.returnWorkPermit();
+ if (task != null)
+ logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
+ else
+ logger.error("Unexpected exception killed worker: {}", t);
+ }
+ }
+
+ // try to assign this worker the provided work
+ // valid states to assign are SPINNING, STOP_SIGNALLED, (ASSIGNED);
+ // restores invariants of the various states (e.g. spinningCount, descheduled collection and thread park status)
+ boolean assign(Work work, boolean self)
+ {
+ Work state = get();
+ while (state.canAssign(self))
+ {
+ if (!compareAndSet(state, work))
+ {
+ state = get();
+ continue;
+ }
+ // if we were spinning, exit the state (decrement the count); this is valid even if we are already spinning,
+ // as the assigning thread will have incremented the spinningCount
+ if (state.isSpinning())
+ stopSpinning();
+
+ // if we're being descheduled, place ourselves in the descheduled collection
+ if (work.isStop())
+ pool.descheduled.put(workerId, this);
+
+ // if we're currently stopped, and the new state is not a stop signal
+ // (which we can immediately convert to stopped), unpark the worker
+ if (state.isStopped() && (!work.isStop() || !stop()))
+ LockSupport.unpark(thread);
+ return true;
+ }
+ return false;
+ }
+
+ // try to assign ourselves an executor with work available
+ private boolean selfAssign()
+ {
+ // if we aren't permitted to assign in this state, fail
+ if (!get().canAssign(true))
+ return false;
+ for (SEPExecutor exec : pool.executors)
+ {
+ if (exec.takeWorkPermit(true))
+ {
+ Work work = new Work(exec);
+ // we successfully started work on this executor, so we must either assign it to ourselves or ...
+ if (assign(work, true))
+ return true;
+ // ... if we fail, schedule it to another worker
+ pool.schedule(work);
+ // and return success as we must have already been assigned a task
+ assert get().assigned != null;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // we can only call this when our state is WORKING, and no other thread may change our state in this case;
+ // so in this case only we do not need to CAS. We increment the spinningCount and add ourselves to the spinning
+ // collection at the same time
+ private void startSpinning()
+ {
+ assert get() == Work.WORKING;
+ pool.spinningCount.incrementAndGet();
+ set(Work.SPINNING);
+ }
+
+ // exit the spinning state; if there are no remaining spinners, we immediately try and schedule work for all executors
+ // so that any producer is safe to not spin up a worker when they see a spinning thread (invariant (1) above)
+ private void stopSpinning()
+ {
+ if (pool.spinningCount.decrementAndGet() == 0)
+ for (SEPExecutor executor : pool.executors)
+ executor.maybeSchedule();
+ prevStopCheck = soleSpinnerSpinTime = 0;
+ }
+
+ // perform a sleep-spin, incrementing pool.stopCheck accordingly
+ private void doWaitSpin()
+ {
+ // pick a random sleep interval based on the number of threads spinning, so that
+ // we should always have a thread about to wake up, but most threads are sleeping
+ long sleep = 10000 * pool.spinningCount.get();
+ sleep = Math.min(1000000, sleep);
+ sleep *= Math.random();
+ sleep = Math.max(10000, sleep);
+
+ long start = System.nanoTime();
+
+ // place ourselves in the spinning collection; if we clash with another thread just exit
+ Long target = start + sleep;
+ if (pool.spinning.putIfAbsent(target, this) != null)
+ return;
+ LockSupport.parkNanos(sleep);
+
+ // remove ourselves (if haven't been already) - we should be at or near the front, so should be cheap-ish
+ pool.spinning.remove(target, this);
+
+ // finish timing and grab spinningTime (before we finish timing so it is under rather than overestimated)
+ long end = System.nanoTime();
+ long spin = end - start;
+ long stopCheck = pool.stopCheck.addAndGet(spin);
+ maybeStop(stopCheck, end);
+ if (prevStopCheck + spin == stopCheck)
+ soleSpinnerSpinTime += spin;
+ else
+ soleSpinnerSpinTime = 0;
+ prevStopCheck = stopCheck;
+ }
+
+ private static final long stopCheckInterval = TimeUnit.MILLISECONDS.toNanos(10L);
+
+ // stops a worker if elapsed real time is less than elapsed spin time, as this implies the equivalent of
+ // at least one worker achieved nothing in the interval. we achieve this by maintaining a stopCheck which
+ // is initialised to a negative offset from realtime; as we spin we add to this value, and if we ever exceed
+ // realtime we have spun too much and deschedule; if we get too far behind realtime, we reset to our initial offset
+ private void maybeStop(long stopCheck, long now)
+ {
+ long delta = now - stopCheck;
+ if (delta <= 0)
+ {
+ // if stopCheck has caught up with present, we've been spinning too much, so if we can atomically
+ // set it to the past again, we should stop a worker
+ if (pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
+ {
+ // try and stop ourselves;
+ // if we've already been assigned work stop another worker
+ if (!assign(Work.STOP_SIGNALLED, true))
+ pool.schedule(Work.STOP_SIGNALLED);
+ }
+ }
+ else if (soleSpinnerSpinTime > stopCheckInterval && pool.spinningCount.get() == 1)
+ {
+ // permit self-stopping
+ assign(Work.STOP_SIGNALLED, true);
+ }
+ else
+ {
+ // if stop check has gotten too far behind present, update it so new spins can affect it
+ while (delta > stopCheckInterval * 2 && !pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
+ {
+ stopCheck = pool.stopCheck.get();
+ delta = now - stopCheck;
+ }
+ }
+ }
+
+ private boolean isSpinning()
+ {
+ return get().isSpinning();
+ }
+
+ private boolean stop()
+ {
+ return get().isStop() && compareAndSet(Work.STOP_SIGNALLED, Work.STOPPED);
+ }
+
+ private boolean isStopped()
+ {
+ return get().isStopped();
+ }
+
+ /**
+ * Represents, and communicates changes to, a worker's work state - there are three non-actively-working
+ * states (STOP_SIGNALLED, STOPPED, AND SPINNING) and two working states: WORKING, and (ASSIGNED), the last
+ * being represented by a non-static instance with its "assigned" executor set.
+ *
+ * STOPPED: indicates the worker is descheduled, and whilst accepts work in this state (causing it to
+ * be rescheduled) it will generally not be considered for work until all other worker threads are busy.
+ * In this state we should be present in the pool.descheduled collection, and should be parked
+ * -> (ASSIGNED)|SPINNING
+ * STOP_SIGNALLED: the worker has been asked to deschedule itself, but has not yet done so; only entered from a SPINNING
+ * state, and generally communicated to itself, but maybe set from any worker. this state may be preempted
+ * and replaced with (ASSIGNED) or SPINNING
+ * In this state we should be present in the pool.descheduled collection
+ * -> (ASSIGNED)|STOPPED|SPINNING
+ * SPINNING: indicates the worker has no work to perform, so is performing a friendly wait-based-spinning
+ * until it either is (ASSIGNED) some work (by itself or another thread), or sent STOP_SIGNALLED
+ * In this state we _may_ be in the pool.spinning collection (but only if we are in the middle of a sleep)
+ * -> (ASSIGNED)|STOP_SIGNALLED|SPINNING
+ * (ASSIGNED): asks the worker to perform some work against the specified executor, and preassigns a task permit
+ * from that executor so that in this state there is always work to perform.
+ * In general a worker assigns itself this state, but sometimes it may assign another worker the state
+ * either if there is work outstanding and no-spinning threads, or there is a race to self-assign
+ * -> WORKING
+ * WORKING: indicates the worker is actively processing an executor's task queue; in this state it accepts
+ * no state changes/communications, except from itself; it usually exits this mode into SPINNING,
+ * but if work is immediately available on another executor it self-triggers (ASSIGNED)
+ * -> SPINNING|(ASSIGNED)
+ */
+
+ static final class Work
+ {
+ static final Work STOP_SIGNALLED = new Work();
+ static final Work STOPPED = new Work();
+ static final Work SPINNING = new Work();
+ static final Work WORKING = new Work();
+
+ final SEPExecutor assigned;
+
+ Work(SEPExecutor executor)
+ {
+ this.assigned = executor;
+ }
+
+ private Work()
+ {
+ this.assigned = null;
+ }
+
+ boolean canAssign(boolean self)
+ {
+ // we can assign work if there isn't new work already assigned and either
+ // 1) we are assigning to ourselves
+ // 2) the worker we are assigning to is not already in the middle of WORKING
+ return assigned == null && (self || !isWorking());
+ }
+
+ boolean isSpinning()
+ {
+ return this == Work.SPINNING;
+ }
+
+ boolean isWorking()
+ {
+ return this == Work.WORKING;
+ }
+
+ boolean isStop()
+ {
+ return this == Work.STOP_SIGNALLED;
+ }
+
+ boolean isStopped()
+ {
+ return this == Work.STOPPED;
+ }
+
+ boolean isAssigned()
+ {
+ return assigned != null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
new file mode 100644
index 0000000..e03ec57
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.concurrent.SEPWorker.Work;
+
+/**
+ * A pool of worker threads that are shared between all Executors created with it. Each executor is treated as a distinct
+ * unit, with its own concurrency and task queue limits, but the threads that service the tasks on each executor are
+ * free to hop between executors at will.
+ *
+ * To keep producers from incurring unnecessary delays, once an executor is "spun up" (i.e. is processing tasks at a steady
+ * rate), adding tasks to the executor often involves only placing the task on the work queue and updating the
+ * task permits (which imposes our max queue length constraints). Only when it cannot be guaranteed the task will be serviced
+ * promptly does the producer have to signal a thread itself to perform the work.
+ *
+ * We do this by scheduling only if
+ *
+ * The worker threads schedule themselves as far as possible: when they are assigned a task, they will attempt to spawn
+ * a partner worker to service any other work outstanding on the queue (if any); once they have finished the task they
+ * will either take another (if any remaining) and repeat this, or they will attempt to assign themselves to another executor
+ * that does have tasks remaining. If both fail, it will enter a non-busy-spinning phase, where it will sleep for a short
+ * random interval (based upon the number of threads in this mode, so that the total amount of non-sleeping time remains
+ * approximately fixed regardless of the number of spinning threads), and upon waking up will again try to assign themselves
+ * an executor with outstanding tasks to perform.
+ */
+public class SharedExecutorPool
+{
+
+ // the name assigned to workers in the pool, and the id suffix
+ final String poolName;
+ final AtomicLong workerId = new AtomicLong();
+
+ // the collection of executors serviced by this pool; periodically ordered by traffic volume
+ final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+
+ // the number of workers currently in a spinning state
+ final AtomicInteger spinningCount = new AtomicInteger();
+ // see SEPWorker.maybeStop() - used to self coordinate stopping of threads
+ final AtomicLong stopCheck = new AtomicLong();
+ // the collection of threads that are (most likely) in a spinning state - new workers are scheduled from here first
+ // TODO: consider using a queue partially-ordered by scheduled wake-up time
+ // (a full-fledged correctly ordered SkipList is overkill)
+ final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>();
+ // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
+ final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
+
+ public SharedExecutorPool(String poolName)
+ {
+ this.poolName = poolName;
+ }
+
+ void schedule(Work work)
+ {
+ // we try to hand-off our work to the spinning queue before the descheduled queue, even though we expect it to be empty
+ // all we're doing here is hoping to find a worker without work to do, but it doesn't matter too much what we find;
+ // we atomically set the task so even if this were a collection of all workers it would be safe, and if they are both
+ // empty we schedule a new thread
+ Map.Entry<Long, SEPWorker> e;
+ while (null != (e = spinning.pollFirstEntry()) || null != (e = descheduled.pollFirstEntry()))
+ if (e.getValue().assign(work, false))
+ return;
+
+ if (!work.isStop())
+ new SEPWorker(workerId.incrementAndGet(), work, this);
+ }
+
+ void maybeStartSpinningWorker()
+ {
+ // in general the workers manage spinningCount directly; however if it is zero, we increment it atomically
+ // ourselves to avoid starting a worker unless we have to
+ int current = spinningCount.get();
+ if (current == 0 && spinningCount.compareAndSet(0, 1))
+ schedule(Work.SPINNING);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 512d64a..303f658 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -45,10 +45,10 @@ public class StageManager
static
{
- stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
- stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
- stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
- stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
+ stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
+ stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
+ stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
+ stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
// the rest are all single-threaded
stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
@@ -87,14 +87,9 @@ public class StageManager
stage.getJmxType());
}
- private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+ private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
{
- return new JMXConfigurableThreadPoolExecutor(numThreads,
- KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory(stage.getJmxName()),
- stage.getJmxType());
+ return JMXEnabledSharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxName(), stage.getJmxType());
}
/**
@@ -134,6 +129,11 @@ public class StageManager
super.execute(command);
}
+ public void maybeExecuteImmediately(Runnable command)
+ {
+ execute(command);
+ }
+
@Override
public Future<?> submit(Runnable task)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
index e5dcd7e..f580fea 100644
--- a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -30,4 +30,7 @@ public interface TracingAwareExecutorService extends ExecutorService
// we need a way to inject a TraceState directly into the Executor context without going through
// the global Tracing sessions; see CASSANDRA-5668
public void execute(Runnable command, TraceState state);
+
+ // permits executing in the context of the submitting thread
+ public void maybeExecuteImmediately(Runnable command);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/SEPMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/SEPMetrics.java b/src/java/org/apache/cassandra/metrics/SEPMetrics.java
new file mode 100644
index 0000000..fbccc3b
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/SEPMetrics.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import org.apache.cassandra.concurrent.SEPExecutor;
+
+public class SEPMetrics
+{
+ /** Number of active tasks. */
+ public final Gauge<Integer> activeTasks;
+ /** Number of tasks that had blocked before being accepted (or rejected). */
+ public final Gauge<Integer> totalBlocked;
+ /**
+ * Number of tasks currently blocked, waiting to be accepted by
+ * the executor (because all threads are busy and the backing queue is full).
+ */
+ public final Gauge<Long> currentBlocked;
+ /** Number of completed tasks. */
+ public final Gauge<Long> completedTasks;
+
+ /** Number of tasks waiting to be executed. */
+ public final Gauge<Long> pendingTasks;
+
+ private MetricNameFactory factory;
+
+ /**
+ * Create metrics for the given LowSignalExecutor.
+ *
+ * @param executor Thread pool
+ * @param path Type of thread pool
+ * @param poolName Name of thread pool to identify metrics
+ */
+ public SEPMetrics(final SEPExecutor executor, String path, String poolName)
+ {
+ this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
+ activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return executor.getActiveCount();
+ }
+ });
+ pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getPendingTasks();
+ }
+ });
+ totalBlocked = Metrics.newGauge(factory.createMetricName("TotalBlockedTasks"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return executor.getTotalBlockedTasks();
+ }
+ });
+ currentBlocked = Metrics.newGauge(factory.createMetricName("CurrentlyBlockedTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return (long) executor.getCurrentlyBlockedTasks();
+ }
+ });
+ completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getCompletedTasks();
+ }
+ });
+ }
+
+ public void release()
+ {
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("ActiveTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompletedTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
new file mode 100644
index 0000000..4afc4d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetricNameFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.core.MetricName;
+
+class ThreadPoolMetricNameFactory implements MetricNameFactory
+{
+ private final String type;
+ private final String path;
+ private final String poolName;
+
+ ThreadPoolMetricNameFactory(String type, String path, String poolName)
+ {
+ this.type = type;
+ this.path = path;
+ this.poolName = poolName;
+ }
+
+ public MetricName createMetricName(String metricName)
+ {
+ String groupName = ThreadPoolMetrics.class.getPackage().getName();
+ StringBuilder mbeanName = new StringBuilder();
+ mbeanName.append(groupName).append(":");
+ mbeanName.append("type=").append(type);
+ mbeanName.append(",path=").append(path);
+ mbeanName.append(",scope=").append(poolName);
+ mbeanName.append(",name=").append(metricName);
+
+ return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
index af54cdb..3cebf07 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
@@ -52,7 +52,7 @@ public class ThreadPoolMetrics
*/
public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName)
{
- this.factory = new ThreadPoolMetricNameFactory(path, poolName);
+ this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
{
@@ -87,30 +87,4 @@ public class ThreadPoolMetrics
Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
}
-
- class ThreadPoolMetricNameFactory implements MetricNameFactory
- {
- private final String path;
- private final String poolName;
-
- ThreadPoolMetricNameFactory(String path, String poolName)
- {
- this.path = path;
- this.poolName = poolName;
- }
-
- public MetricName createMetricName(String metricName)
- {
- String groupName = ThreadPoolMetrics.class.getPackage().getName();
- String type = "ThreadPools";
- StringBuilder mbeanName = new StringBuilder();
- mbeanName.append(groupName).append(":");
- mbeanName.append("type=").append(type);
- mbeanName.append(",path=").append(path);
- mbeanName.append(",scope=").append(poolName);
- mbeanName.append(",name=").append(metricName);
-
- return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e5db1d7..3e88b37 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.service.paxos.PrepareResponse;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
public final class MessagingService implements MessagingServiceMBean
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 2e1adcb..af00403 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
/**
* RepairJob runs repair on given ColumnFamily.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 1b6375e..507dafa 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
/**
* Coordinates the (active) repair of a token range.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3f57e73..2c3261f 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,12 +77,12 @@ public abstract class AbstractReadExecutor
protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
+ boolean readLocal = false;
for (InetAddress endpoint : endpoints)
{
if (isLocalRequest(endpoint))
{
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+ readLocal = true;
}
else
{
@@ -90,6 +90,11 @@ public abstract class AbstractReadExecutor
MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
}
}
+ if (readLocal)
+ {
+ logger.trace("reading data locally");
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
+ }
}
protected void makeDigestRequests(Iterable<InetAddress> endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index c75aac2..ece9289 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
public abstract class AbstractWriteResponseHandler implements IAsyncCallback
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 64ef443..29eaadf 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index d01dd99..2cbc475 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -780,6 +780,7 @@ public class StorageProxy implements StorageProxyMBean
// only need to create a Message for non-local writes
MessageOut<Mutation> message = null;
+ boolean insertLocal = false;
for (InetAddress destination : targets)
{
// avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
@@ -797,7 +798,7 @@ public class StorageProxy implements StorageProxyMBean
{
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- insertLocal(mutation, responseHandler);
+ insertLocal = true;
}
else
{
@@ -835,6 +836,9 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ if (insertLocal)
+ insertLocal(mutation, responseHandler);
+
if (dcGroups != null)
{
// for each datacenter, send the message to one node to relay the write to other replicas
@@ -944,7 +948,7 @@ public class StorageProxy implements StorageProxyMBean
private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
{
- StageManager.getStage(Stage.MUTATION).execute(new LocalMutationRunnable()
+ StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 3bacad8..cce8ecc 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
public class TruncateResponseHandler implements IAsyncCallback
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8e3840c..a60ab84 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -59,7 +59,7 @@ import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.StreamManagerMBean;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
/**
* JMX client operations for Cassandra.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0ad4312..9e8719e 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -335,7 +335,7 @@ public abstract class Message
}
void start()
{
- if (running.compareAndSet(false, true))
+ if (!running.get() && running.compareAndSet(false, true))
{
this.eventLoop.execute(this);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
index ee7d127..9cac645 100644
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -18,29 +18,24 @@
package org.apache.cassandra.transport;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
+import static org.apache.cassandra.concurrent.JMXEnabledSharedExecutorPool.SHARED;
+
public class RequestThreadPoolExecutor extends AbstractEventExecutor
{
-
- private final static int CORE_THREAD_TIMEOUT_SEC = 30;
- // Number of request we accept to queue before blocking. We could allow this to be configured...
private final static int MAX_QUEUED_REQUESTS = 128;
-
private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
- private final JMXEnabledThreadPoolExecutor wrapped = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
- CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(MAX_QUEUED_REQUESTS),
- new NamedThreadFactory(THREAD_FACTORY_ID),
- "transport");
+ private final TracingAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+ MAX_QUEUED_REQUESTS,
+ THREAD_FACTORY_ID,
+ "transport");
public boolean isShuttingDown()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java
deleted file mode 100644
index 4d5f896..0000000
--- a/src/java/org/apache/cassandra/utils/SimpleCondition.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-
-// fulfils the Condition interface without spurious wakeup problems
-// (or lost notify problems either: that is, even if you call await()
-// _after_ signal(), it will work as desired.)
-public class SimpleCondition implements Condition
-{
- private boolean set;
-
- public synchronized void await() throws InterruptedException
- {
- while (!set)
- wait();
- }
-
- public synchronized void reset()
- {
- set = false;
- }
-
- public synchronized boolean await(long time, TimeUnit unit) throws InterruptedException
- {
- long start = System.nanoTime();
- long timeout = unit.toNanos(time);
- long elapsed;
- while (!set && (elapsed = System.nanoTime() - start) < timeout)
- {
- TimeUnit.NANOSECONDS.timedWait(this, timeout - elapsed);
- }
- return set;
- }
-
- public void signal()
- {
- throw new UnsupportedOperationException();
- }
-
- public synchronized void signalAll()
- {
- set = true;
- notifyAll();
- }
-
- public synchronized boolean isSignaled()
- {
- return set;
- }
-
- public void awaitUninterruptibly()
- {
- throw new UnsupportedOperationException();
- }
-
- public long awaitNanos(long nanosTimeout) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean awaitUntil(Date deadline) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
new file mode 100644
index 0000000..57614e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Condition;
+
+// fulfils the Condition interface without spurious wakeup problems
+// (or lost notify problems either: that is, even if you call await()
+// _after_ signal(), it will work as desired.)
+public class SimpleCondition implements Condition
+{
+ private static final AtomicReferenceFieldUpdater<SimpleCondition, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(SimpleCondition.class, WaitQueue.class, "waiting");
+
+ private volatile WaitQueue waiting;
+ private volatile boolean signaled = false;
+
+ public void await() throws InterruptedException
+ {
+ if (isSignaled())
+ return;
+ if (waiting == null)
+ waitingUpdater.compareAndSet(this, null, new WaitQueue());
+ WaitQueue.Signal s = waiting.register();
+ if (isSignaled())
+ s.cancel();
+ else
+ s.await();
+ assert isSignaled();
+ }
+
+ public boolean await(long time, TimeUnit unit) throws InterruptedException
+ {
+ if (isSignaled())
+ return true;
+ long start = System.nanoTime();
+ long until = start + unit.toNanos(time);
+ if (waiting == null)
+ waitingUpdater.compareAndSet(this, null, new WaitQueue());
+ WaitQueue.Signal s = waiting.register();
+ if (isSignaled())
+ {
+ s.cancel();
+ return true;
+ }
+ return s.awaitUntil(until) || isSignaled();
+ }
+
+ public void signal()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isSignaled()
+ {
+ return signaled;
+ }
+
+ public void signalAll()
+ {
+ signaled = true;
+ if (waiting != null)
+ waiting.signalAll();
+ }
+
+ public void awaitUninterruptibly()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long awaitNanos(long nanosTimeout) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean awaitUntil(Date deadline) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5420b7a2/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
index 8d072ea..2322210 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
@@ -73,8 +74,6 @@ import java.util.concurrent.locks.LockSupport;
public final class WaitQueue
{
- private static final Logger logger = LoggerFactory.getLogger(WaitQueue.class);
-
private static final int CANCELLED = -1;
private static final int SIGNALLED = 1;
private static final int NOT_SET = 0;
@@ -82,11 +81,11 @@ public final class WaitQueue
private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state");
// the waiting signals
- private final ConcurrentLinkedDeque<RegisteredSignal> queue = new ConcurrentLinkedDeque<>();
+ private final ConcurrentLinkedQueue<RegisteredSignal> queue = new ConcurrentLinkedQueue<>();
/**
* The calling thread MUST be the thread that uses the signal
- * @return
+ * @return x
*/
public Signal register()
{
@@ -119,7 +118,7 @@ public final class WaitQueue
while (true)
{
RegisteredSignal s = queue.poll();
- if (s == null || s.signal())
+ if (s == null || s.signal() != null)
return s != null;
}
}
@@ -129,42 +128,40 @@ public final class WaitQueue
*/
public void signalAll()
{
- RegisteredSignal last = queue.peekLast();
- if (last == null)
+ if (!hasWaiters())
return;
- List<Thread> woke = null;
- if (logger.isTraceEnabled())
- woke = new ArrayList<>();
- long start = System.nanoTime();
- // we wake up only a snapshot of the queue, to avoid a race where the condition is not met and the woken thread
- // immediately waits on the queue again
+
+ // to avoid a race where the condition is not met and the woken thread managed to wait on the queue before
+ // we finish signalling it all, we pick a random thread we have woken-up and hold onto it, so that if we encounter
+ // it again we know we're looping. We reselect a random thread periodically, progressively less often.
+ // the "correct" solution to this problem is to use a queue that permits snapshot iteration, but this solution is sufficient
+ int i = 0, s = 5;
+ Thread randomThread = null;
Iterator<RegisteredSignal> iter = queue.iterator();
while (iter.hasNext())
{
RegisteredSignal signal = iter.next();
- if (logger.isTraceEnabled())
+ Thread signalled = signal.signal();
+
+ if (signalled != null)
{
- Thread thread = signal.thread;
- if (signal.signal())
- woke.add(thread);
+ if (signalled == randomThread)
+ break;
+
+ if (++i == s)
+ {
+ randomThread = signalled;
+ s <<= 1;
+ }
}
- else
- signal.signal();
iter.remove();
-
- if (signal == last)
- break;
}
- long end = System.nanoTime();
- if (woke != null)
- logger.trace("Woke up {} in {}ms from {}", woke, (end - start) * 0.000001d, Thread.currentThread().getStackTrace()[2]);
}
private void cleanUpCancelled()
{
- // attempt to remove the cancelled from the beginning only, but if we fail to remove any proceed to cover
- // the whole list
+ // TODO: attempt to remove the cancelled from the beginning only (need atomic cas of head)
Iterator<RegisteredSignal> iter = queue.iterator();
while (iter.hasNext())
{
@@ -185,7 +182,7 @@ public final class WaitQueue
*/
public int getWaiting()
{
- if (queue.isEmpty())
+ if (!hasWaiters())
return 0;
Iterator<RegisteredSignal> iter = queue.iterator();
int count = 0;
@@ -264,11 +261,11 @@ public final class WaitQueue
* isSignalled() will be true on exit, and the method will return true; if timedout, the method will return
* false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled()
* will be true.
- * @param until System.currentTimeMillis() to wait until
+ * @param nanos System.nanoTime() to wait until
* @return true if signalled, false if timed out
* @throws InterruptedException
*/
- public boolean awaitUntil(long until) throws InterruptedException;
+ public boolean awaitUntil(long nanos) throws InterruptedException;
}
/**
@@ -302,10 +299,12 @@ public final class WaitQueue
public boolean awaitUntil(long until) throws InterruptedException
{
- while (until < System.currentTimeMillis() && !isSignalled())
+ long now;
+ while (until > (now = System.nanoTime()) && !isSignalled())
{
checkInterrupted();
- LockSupport.parkUntil(until);
+ long delta = until - now;
+ LockSupport.parkNanos(delta);
}
return checkAndClear();
}
@@ -343,15 +342,16 @@ public final class WaitQueue
return state != NOT_SET;
}
- private boolean signal()
+ private Thread signal()
{
if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED))
{
+ Thread thread = this.thread;
LockSupport.unpark(thread);
- thread = null;
- return true;
+ this.thread = null;
+ return thread;
}
- return false;
+ return null;
}
public boolean checkAndClear()