You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/13 20:48:55 UTC

[04/13] cassandra git commit: Make sure client gets tombstone overwhelmed warning

Make sure client gets tombstone overwhelmed warning

patch by Carl Yeksigian; reviewed by Josh McKenzie for CASSANDRA-9465


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbf6e62c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbf6e62c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbf6e62c

Branch: refs/heads/trunk
Commit: dbf6e62c382d62f9c1727ecf5afb90d131a81775
Parents: 582bdba
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jan 13 13:22:36 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jan 13 14:35:59 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../AbstractLocalAwareExecutorService.java      | 229 +++++++++++++++++++
 .../AbstractTracingAwareExecutorService.java    | 229 -------------------
 .../DebuggableThreadPoolExecutor.java           |  48 ++--
 .../cassandra/concurrent/ExecutorLocal.java     |  44 ++++
 .../cassandra/concurrent/ExecutorLocals.java    |  84 +++++++
 .../concurrent/LocalAwareExecutorService.java   |  34 +++
 .../cassandra/concurrent/SEPExecutor.java       |   3 +-
 .../concurrent/SharedExecutorPool.java          |   2 +-
 .../cassandra/concurrent/StageManager.java      |  12 +-
 .../concurrent/TracingAwareExecutorService.java |  36 ---
 .../cql3/statements/BatchStatement.java         |  11 +-
 .../cql3/statements/SelectStatement.java        |   4 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |   2 +-
 .../apache/cassandra/net/MessagingService.java  |   8 +-
 .../apache/cassandra/service/ClientWarn.java    |  62 +++--
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   3 +-
 .../org/apache/cassandra/transport/Message.java |   6 +-
 .../transport/RequestThreadPoolExecutor.java    |   4 +-
 .../cassandra/service/ClientWarningsTest.java   |  43 ++++
 21 files changed, 529 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11f2529..6530956 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465)
  * Fix error streaming section more than 2GB (CASSANDRA-10961)
  * (cqlsh) Also apply --connect-timeout to control connection
    timeout (CASSANDRA-10959)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
new file mode 100644
index 0000000..088b43e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.tracing.Tracing.isTracing;
+
+public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class);
+
+    protected abstract void addTask(FutureTask<?> futureTask);
+    protected abstract void onCompletion();
+
+    /** Task Submission / Creation / Objects **/
+
+    public <T> FutureTask<T> submit(Callable<T> task)
+    {
+        return submit(newTaskFor(task));
+    }
+
+    public FutureTask<?> submit(Runnable task)
+    {
+        return submit(newTaskFor(task, null));
+    }
+
+    public <T> FutureTask<T> submit(Runnable task, T result)
+    {
+        return submit(newTaskFor(task, result));
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+    {
+        return newTaskFor(runnable, result, ExecutorLocals.create());
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals)
+    {
+        if (locals != null)
+        {
+            if (runnable instanceof LocalSessionFutureTask)
+                return (LocalSessionFutureTask<T>) runnable;
+            return new LocalSessionFutureTask<T>(runnable, result, locals);
+        }
+        if (runnable instanceof FutureTask)
+            return (FutureTask<T>) runnable;
+        return new FutureTask<>(runnable, result);
+    }
+
+    protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+    {
+        if (isTracing())
+        {
+            if (callable instanceof LocalSessionFutureTask)
+                return (LocalSessionFutureTask<T>) callable;
+            return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create());
+        }
+        if (callable instanceof FutureTask)
+            return (FutureTask<T>) callable;
+        return new FutureTask<>(callable);
+    }
+
+    private class LocalSessionFutureTask<T> extends FutureTask<T>
+    {
+        private final ExecutorLocals locals;
+
+        public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals)
+        {
+            super(callable);
+            this.locals = locals;
+        }
+
+        public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals)
+        {
+            super(runnable, result);
+            this.locals = locals;
+        }
+
+        public void run()
+        {
+            ExecutorLocals old = ExecutorLocals.create();
+            ExecutorLocals.set(locals);
+            try
+            {
+                super.run();
+            }
+            finally
+            {
+                ExecutorLocals.set(old);
+            }
+        }
+    }
+
+    class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+    {
+        private boolean failure;
+        private Object result = this;
+        private final Callable<T> callable;
+
+        public FutureTask(Callable<T> callable)
+        {
+            this.callable = callable;
+        }
+        public FutureTask(Runnable runnable, T result)
+        {
+            this(Executors.callable(runnable, result));
+        }
+
+        public void run()
+        {
+            try
+            {
+                result = callable.call();
+            }
+            catch (Throwable t)
+            {
+                JVMStabilityInspector.inspectThrowable(t);
+                logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+                result = t;
+                failure = true;
+            }
+            finally
+            {
+                signalAll();
+                onCompletion();
+            }
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            return false;
+        }
+
+        public boolean isCancelled()
+        {
+            return false;
+        }
+
+        public boolean isDone()
+        {
+            return isSignaled();
+        }
+
+        public T get() throws InterruptedException, ExecutionException
+        {
+            await();
+            Object result = this.result;
+            if (failure)
+                throw new ExecutionException((Throwable) result);
+            return (T) result;
+        }
+
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+        {
+            await(timeout, unit);
+            Object result = this.result;
+            if (failure)
+                throw new ExecutionException((Throwable) result);
+            return (T) result;
+        }
+    }
+
+    private <T> FutureTask<T> submit(FutureTask<T> task)
+    {
+        addTask(task);
+        return task;
+    }
+
+    public void execute(Runnable command)
+    {
+        addTask(newTaskFor(command, ExecutorLocals.create()));
+    }
+
+    public void execute(Runnable command, ExecutorLocals locals)
+    {
+        addTask(newTaskFor(command, null, locals));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
deleted file mode 100644
index fb753b0..0000000
--- a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-import static org.apache.cassandra.tracing.Tracing.isTracing;
-
-public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService
-{
-    private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class);
-
-    protected abstract void addTask(FutureTask<?> futureTask);
-    protected abstract void onCompletion();
-
-    /** Task Submission / Creation / Objects **/
-
-    public <T> FutureTask<T> submit(Callable<T> task)
-    {
-        return submit(newTaskFor(task));
-    }
-
-    public FutureTask<?> submit(Runnable task)
-    {
-        return submit(newTaskFor(task, null));
-    }
-
-    public <T> FutureTask<T> submit(Runnable task, T result)
-    {
-        return submit(newTaskFor(task, result));
-    }
-
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
-    {
-        return newTaskFor(runnable, result, Tracing.instance.get());
-    }
-
-    protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState)
-    {
-        if (traceState != null)
-        {
-            if (runnable instanceof TraceSessionFutureTask)
-                return (TraceSessionFutureTask<T>) runnable;
-            return new TraceSessionFutureTask<T>(runnable, result, traceState);
-        }
-        if (runnable instanceof FutureTask)
-            return (FutureTask<T>) runnable;
-        return new FutureTask<>(runnable, result);
-    }
-
-    protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
-    {
-        if (isTracing())
-        {
-            if (callable instanceof TraceSessionFutureTask)
-                return (TraceSessionFutureTask<T>) callable;
-            return new TraceSessionFutureTask<T>(callable, Tracing.instance.get());
-        }
-        if (callable instanceof FutureTask)
-            return (FutureTask<T>) callable;
-        return new FutureTask<>(callable);
-    }
-
-    private class TraceSessionFutureTask<T> extends FutureTask<T>
-    {
-        private final TraceState state;
-
-        public TraceSessionFutureTask(Callable<T> callable, TraceState state)
-        {
-            super(callable);
-            this.state = state;
-        }
-
-        public TraceSessionFutureTask(Runnable runnable, T result, TraceState state)
-        {
-            super(runnable, result);
-            this.state = state;
-        }
-
-        public void run()
-        {
-            TraceState oldState = Tracing.instance.get();
-            Tracing.instance.set(state);
-            try
-            {
-                super.run();
-            }
-            finally
-            {
-                Tracing.instance.set(oldState);
-            }
-        }
-    }
-
-    class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
-    {
-        private boolean failure;
-        private Object result = this;
-        private final Callable<T> callable;
-
-        public FutureTask(Callable<T> callable)
-        {
-            this.callable = callable;
-        }
-        public FutureTask(Runnable runnable, T result)
-        {
-            this(Executors.callable(runnable, result));
-        }
-
-        public void run()
-        {
-            try
-            {
-                result = callable.call();
-            }
-            catch (Throwable t)
-            {
-                JVMStabilityInspector.inspectThrowable(t);
-                logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
-                result = t;
-                failure = true;
-            }
-            finally
-            {
-                signalAll();
-                onCompletion();
-            }
-        }
-
-        public boolean cancel(boolean mayInterruptIfRunning)
-        {
-            return false;
-        }
-
-        public boolean isCancelled()
-        {
-            return false;
-        }
-
-        public boolean isDone()
-        {
-            return isSignaled();
-        }
-
-        public T get() throws InterruptedException, ExecutionException
-        {
-            await();
-            Object result = this.result;
-            if (failure)
-                throw new ExecutionException((Throwable) result);
-            return (T) result;
-        }
-
-        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
-        {
-            await(timeout, unit);
-            Object result = this.result;
-            if (failure)
-                throw new ExecutionException((Throwable) result);
-            return (T) result;
-        }
-    }
-
-    private <T> FutureTask<T> submit(FutureTask<T> task)
-    {
-        addTask(task);
-        return task;
-    }
-
-    public void execute(Runnable command)
-    {
-        addTask(newTaskFor(command, null));
-    }
-
-    public void execute(Runnable command, TraceState state)
-    {
-        addTask(newTaskFor(command, null, state));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index a6d0049..1fb0690 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
  *   threads and the queue is full, we want the enqueuer to block.  But to allow the number of threads to drop if a
  *   stage is less busy, core thread timeout is enabled.
  */
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
 {
     protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
     public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -146,11 +146,11 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}
 
-    public void execute(Runnable command, TraceState state)
+    public void execute(Runnable command, ExecutorLocals locals)
     {
-        super.execute(state == null || command instanceof TraceSessionWrapper
+        super.execute(locals == null || command instanceof LocalSessionWrapper
                       ? command
-                      : new TraceSessionWrapper<Object>(command, state));
+                      : new LocalSessionWrapper<Object>(command, locals));
     }
 
     public void maybeExecuteImmediately(Runnable command)
@@ -162,17 +162,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
     @Override
     public void execute(Runnable command)
     {
-        super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
-                      ? new TraceSessionWrapper<Object>(Executors.callable(command, null))
+        super.execute(isTracing() && !(command instanceof LocalSessionWrapper)
+                      ? new LocalSessionWrapper<Object>(Executors.callable(command, null))
                       : command);
     }
 
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result)
     {
-        if (isTracing() && !(runnable instanceof TraceSessionWrapper))
+        if (isTracing() && !(runnable instanceof LocalSessionWrapper))
         {
-            return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
+            return new LocalSessionWrapper<T>(Executors.callable(runnable, result));
         }
         return super.newTaskFor(runnable, result);
     }
@@ -180,9 +180,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
     {
-        if (isTracing() && !(callable instanceof TraceSessionWrapper))
+        if (isTracing() && !(callable instanceof LocalSessionWrapper))
         {
-            return new TraceSessionWrapper<T>(callable);
+            return new LocalSessionWrapper<T>(callable);
         }
         return super.newTaskFor(callable);
     }
@@ -198,9 +198,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
 
     protected static void maybeResetTraceSessionWrapper(Runnable r)
     {
-        if (r instanceof TraceSessionWrapper)
+        if (r instanceof LocalSessionWrapper)
         {
-            TraceSessionWrapper tsw = (TraceSessionWrapper) r;
+            LocalSessionWrapper tsw = (LocalSessionWrapper) r;
             // we have to reset trace state as its presence is what denotes the current thread is tracing
             // and if left this thread might start tracing unrelated tasks
             tsw.reset();
@@ -210,8 +210,8 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
     @Override
     protected void beforeExecute(Thread t, Runnable r)
     {
-        if (r instanceof TraceSessionWrapper)
-            ((TraceSessionWrapper) r).setupContext();
+        if (r instanceof LocalSessionWrapper)
+            ((LocalSessionWrapper) r).setupContext();
 
         super.beforeExecute(t, r);
     }
@@ -278,35 +278,35 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
     }
 
     /**
-     * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the TraceSessionContext and move
-     * it into the worker thread.
+     * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the ExecutorLocals and move
+     * them into the worker thread.
      *
      * @param <T>
      */
-    private static class TraceSessionWrapper<T> extends FutureTask<T>
+    private static class LocalSessionWrapper<T> extends FutureTask<T>
     {
-        private final TraceState state;
+        private final ExecutorLocals locals;
 
-        public TraceSessionWrapper(Callable<T> callable)
+        public LocalSessionWrapper(Callable<T> callable)
         {
             super(callable);
-            state = Tracing.instance.get();
+            locals = ExecutorLocals.create();
         }
 
-        public TraceSessionWrapper(Runnable command, TraceState state)
+        public LocalSessionWrapper(Runnable command, ExecutorLocals locals)
         {
             super(command, null);
-            this.state = state;
+            this.locals = locals;
         }
 
         private void setupContext()
         {
-            Tracing.instance.set(state);
+            ExecutorLocals.set(locals);
         }
 
         private void reset()
         {
-            Tracing.instance.set(null);
+            ExecutorLocals.set(null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
new file mode 100644
index 0000000..47826f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+
+public interface ExecutorLocal<T>
+{
+    ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance };
+
+    /**
+     * This is called when scheduling the task, and also before calling {@link ExecutorLocal#set(T)} when running on a
+     * executor thread.
+     *
+     * @return The thread-local value that we want to copy across executor boundaries; may be null if not set.
+     */
+    T get();
+
+    /**
+     * Before a task has been run, this will be called with the value from the thread that scheduled the task, and after
+     * the task is finished, the value that was previously retrieved from this thread is restored.
+     *
+     * @param value Value to use for the executor local state; may be null.
+     */
+    void set(T value);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
new file mode 100644
index 0000000..8e6d6ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+
+/*
+ * This class only knows about Tracing and ClientWarn, so if any different executor locals are added, it must be
+ * updated.
+ *
+ * We don't enumerate the ExecutorLocal.all array each time because it would be much slower.
+ */
+public class ExecutorLocals
+{
+    private static final ExecutorLocal<TraceState> tracing = Tracing.instance;
+    private static final ExecutorLocal<ClientWarn.State> clientWarn = ClientWarn.instance;
+
+    public final TraceState traceState;
+    public final ClientWarn.State clientWarnState;
+
+    private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState)
+    {
+        this.traceState = traceState;
+        this.clientWarnState = clientWarnState;
+    }
+
+    static
+    {
+        assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn })
+        : "ExecutorLocals has not been updated to reflect new ExecutorLocal.all";
+    }
+
+    /**
+     * This creates a new ExecutorLocals object based on what is already set.
+     *
+     * @return an ExecutorLocals object which has the trace state and client warn state captured if either has been set,
+     *         or null if both are unset. The null result short-circuits logic in
+     *         {@link AbstractLocalAwareExecutorService#newTaskFor(Runnable, Object, ExecutorLocals)}, preventing
+     *         unnecessarily calling {@link ExecutorLocals#set(ExecutorLocals)}.
+     */
+    public static ExecutorLocals create()
+    {
+        TraceState traceState = tracing.get();
+        ClientWarn.State clientWarnState = clientWarn.get();
+        if (traceState == null && clientWarnState == null)
+            return null;
+        else
+            return new ExecutorLocals(traceState, clientWarnState);
+    }
+
+    public static ExecutorLocals create(TraceState traceState)
+    {
+        ClientWarn.State clientWarnState = clientWarn.get();
+        return new ExecutorLocals(traceState, clientWarnState);
+    }
+
+    public static void set(ExecutorLocals locals)
+    {
+        TraceState traceState = locals == null ? null : locals.traceState;
+        ClientWarn.State clientWarnState = locals == null ? null : locals.clientWarnState;
+        tracing.set(traceState);
+        clientWarn.set(clientWarnState);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
new file mode 100644
index 0000000..5577d59
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+public interface LocalAwareExecutorService extends ExecutorService
+{
+    // we need a way to inject a TraceState directly into the Executor context without going through
+    // the global Tracing sessions; see CASSANDRA-5668
+    public void execute(Runnable command, ExecutorLocals locals);
+
+    // permits executing in the context of the submitting thread
+    public void maybeExecuteImmediately(Runnable command);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index d9a0fa8..8b12b82 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.cassandra.metrics.SEPMetrics;
@@ -30,7 +29,7 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
 
-public class SEPExecutor extends AbstractTracingAwareExecutorService
+public class SEPExecutor extends AbstractLocalAwareExecutorService
 {
     private final SharedExecutorPool pool;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 8c18c44..dfd7011 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -103,7 +103,7 @@ public class SharedExecutorPool
             schedule(Work.SPINNING);
     }
 
-    public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
+    public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
     {
         SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name);
         executors.add(executor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 4f03fd5..114795e 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -39,7 +39,7 @@ public class StageManager
 {
     private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
 
-    private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
+    private static final EnumMap<Stage, LocalAwareExecutorService> stages = new EnumMap<Stage, LocalAwareExecutorService>(Stage.class);
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
 
@@ -87,7 +87,7 @@ public class StageManager
                                                 stage.getJmxType());
     }
 
-    private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
+    private static LocalAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
     {
         return SharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxType(), stage.getJmxName());
     }
@@ -96,7 +96,7 @@ public class StageManager
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.
      */
-    public static TracingAwareExecutorService getStage(Stage stage)
+    public static LocalAwareExecutorService getStage(Stage stage)
     {
         return stages.get(stage);
     }
@@ -116,16 +116,16 @@ public class StageManager
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
      * tracing stage.  See CASSANDRA-1123 for background.
      */
-    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
+    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
     {
         public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
         {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
         }
 
-        public void execute(Runnable command, TraceState state)
+        public void execute(Runnable command, ExecutorLocals locals)
         {
-            assert state == null;
+            assert locals == null;
             super.execute(command);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
deleted file mode 100644
index f580fea..0000000
--- a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.tracing.TraceState;
-
-public interface TracingAwareExecutorService extends ExecutorService
-{
-    // we need a way to inject a TraceState directly into the Executor context without going through
-    // the global Tracing sessions; see CASSANDRA-5668
-    public void execute(Runnable command, TraceState state);
-
-    // permits executing in the context of the submitting thread
-    public void maybeExecuteImmediately(Runnable command);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 43a80bb..a289ad1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -273,7 +273,7 @@ public class BatchStatement implements CQLStatement
             {
                 logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
             }
-            ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {ksCfPairs, size, warnThreshold, size - warnThreshold, ""}).getMessage());
+            ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage());
         }
     }
 
@@ -305,8 +305,13 @@ public class BatchStatement implements CQLStatement
                              keySet.size(), keySet.size() == 1 ? "" : "s",
                              ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
 
-            ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
-                                                    ksCfPairs.size() == 1 ? "" : "s", ksCfPairs}).getMessage());
+            ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning,
+                                                                  new Object[]{
+                                                                              keySet.size(),
+                                                                              keySet.size() == 1 ? "" : "s",
+                                                                              ksCfPairs.size() == 1 ? "" : "s",
+                                                                              ksCfPairs
+                                                                  }).getMessage());
 
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5f142ce..5cfa94b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -251,12 +251,12 @@ public class SelectStatement implements CQLStatement
         if (!restrictions.hasPartitionKeyRestrictions())
         {
             logger.warn("Aggregation query used without partition key");
-            ClientWarn.warn("Aggregation query used without partition key");
+            ClientWarn.instance.warn("Aggregation query used without partition key");
         }
         else if (restrictions.keyIsInRelation())
         {
             logger.warn("Aggregation query used on multiple partition keys (IN restriction)");
-            ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)");
+            ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)");
         }
 
         Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index e1a68e7..d2f0bf4 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -303,7 +303,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
                                        container.metadata().getKeyValidator().getString(key.getKey()),
                                        count,
                                        getSlicesInfo(container));
-            ClientWarn.warn(msg);
+            ClientWarn.instance.warn(msg);
             logger.warn(msg);
         }
         Tracing.trace("Read {} live and {} tombstone cells{}",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 61e58c2..459923b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -43,10 +43,12 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ExecutorLocal;
+import org.apache.cassandra.concurrent.ExecutorLocals;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.TracingAwareExecutorService;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
@@ -801,10 +803,10 @@ public final class MessagingService implements MessagingServiceMBean
                 return;
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
-        TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
+        LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
-        stage.execute(runnable, state);
+        stage.execute(runnable, ExecutorLocals.create(state));
     }
 
     public void setCallbackForTests(int messageId, CallbackInfo callback)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/ClientWarn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
index 2ed0a6c..ddad197 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -20,54 +20,68 @@ package org.apache.cassandra.service;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.concurrent.ExecutorLocal;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class ClientWarn
+public class ClientWarn implements ExecutorLocal<ClientWarn.State>
 {
     private static final String TRUNCATED = " [truncated]";
-    private static final ThreadLocal<ClientWarn> warnLocal = new ThreadLocal<>();
-
-    private final List<String> warnings = new ArrayList<>();
+    private static final ThreadLocal<ClientWarn.State> warnLocal = new ThreadLocal<>();
+    public static ClientWarn instance = new ClientWarn();
 
     private ClientWarn()
     {
     }
 
-    public static void warn(String text)
-    {
-        ClientWarn warner = warnLocal.get();
-        if (warner != null)
-            warner.add(text);
+    public State get() {
+        return warnLocal.get();
     }
 
-    private void add(String warning)
-    {
-        if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
-            warnings.add(maybeTruncate(warning));
+    public void set(State value) {
+        warnLocal.set(value);
     }
 
-    private static String maybeTruncate(String warning)
+    public void warn(String text)
     {
-        return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
-             ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
-             : warning;
+        State state = warnLocal.get();
+        if (state != null)
+            state.add(text);
     }
 
-    public static void captureWarnings()
+    public void captureWarnings()
     {
-        warnLocal.set(new ClientWarn());
+        warnLocal.set(new State());
     }
 
-    public static List<String> getWarnings()
+    public List<String> getWarnings()
     {
-        ClientWarn warner = warnLocal.get();
-        if (warner == null || warner.warnings.isEmpty())
+        State state = warnLocal.get();
+        if (state == null || state.warnings.isEmpty())
             return null;
-        return warner.warnings;
+        return state.warnings;
     }
 
-    public static void resetWarnings()
+    public void resetWarnings()
     {
         warnLocal.remove();
     }
+
+    public static class State
+    {
+        private final List<String> warnings = new ArrayList<>();
+
+        private void add(String warning)
+        {
+            if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
+                warnings.add(maybeTruncate(warning));
+        }
+
+        private static String maybeTruncate(String warning)
+        {
+            return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
+                   ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
+                   : warning;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 88253e3..841e980 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1842,7 +1842,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (filteredEndpoints.size() == 1
                         && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                     {
-                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
+                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
                     }
                     else
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index ccc2637..bf9cee7 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ExecutorLocal;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
@@ -41,7 +42,7 @@ import org.apache.cassandra.utils.UUIDGen;
  * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may
  * have multiple local and remote events before it is completed. All events and sessions are stored at keyspace.
  */
-public class Tracing
+public class Tracing implements ExecutorLocal<TraceState>
 {
     public static final String TRACE_HEADER = "TraceSession";
     public static final String TRACE_TYPE = "TraceType";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index ab794df..01a0794 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -499,14 +499,14 @@ public abstract class Message
                 assert request.connection() instanceof ServerConnection;
                 connection = (ServerConnection)request.connection();
                 if (connection.getVersion() >= Server.VERSION_4)
-                    ClientWarn.captureWarnings();
+                    ClientWarn.instance.captureWarnings();
 
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
 
                 logger.trace("Received: {}, v={}", request, connection.getVersion());
                 response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
-                response.setWarnings(ClientWarn.getWarnings());
+                response.setWarnings(ClientWarn.instance.getWarnings());
                 response.attach(connection);
                 connection.applyStateTransition(request.type, response.type);
             }
@@ -519,7 +519,7 @@ public abstract class Message
             }
             finally
             {
-                ClientWarn.resetWarnings();
+                ClientWarn.instance.resetWarnings();
             }
 
             logger.trace("Responding: {}, v={}", response, connection.getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
index 4ecd6a7..289f3e3 100644
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
 import io.netty.util.concurrent.AbstractEventExecutor;
 import io.netty.util.concurrent.EventExecutorGroup;
 import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.TracingAwareExecutorService;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
@@ -32,7 +32,7 @@ public class RequestThreadPoolExecutor extends AbstractEventExecutor
 {
     private final static int MAX_QUEUED_REQUESTS = 128;
     private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
-    private final TracingAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+    private final LocalAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
                                                                            MAX_QUEUED_REQUESTS,
                                                                            "transport",
                                                                            THREAD_FACTORY_ID);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index cfd5f7a..d22a8f6 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -24,6 +24,8 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.Server;
@@ -79,6 +81,47 @@ public class ClientWarningsTest extends CQLTester
     }
 
     @Test
+    public void testTombstoneWarning() throws Exception
+    {
+        final int iterations = 10000;
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
+        {
+            client.connect(false);
+
+            for (int i = 0; i < iterations; i++)
+            {
+                QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)",
+                                                                    KEYSPACE,
+                                                                    currentTable(),
+                                                                    i), QueryOptions.DEFAULT);
+                client.execute(query);
+            }
+            ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+            store.forceBlockingFlush();
+
+            for (int i = 0; i < iterations; i++)
+            {
+                QueryMessage query = new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s",
+                                                                    KEYSPACE,
+                                                                    currentTable(),
+                                                                    i), QueryOptions.DEFAULT);
+                client.execute(query);
+            }
+            store.forceBlockingFlush();
+
+            {
+                QueryMessage query = new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1",
+                                                                    KEYSPACE,
+                                                                    currentTable()), QueryOptions.DEFAULT);
+                Message.Response resp = client.execute(query);
+                assertEquals(1, resp.getWarnings().size());
+            }
+        }
+    }
+
+    @Test
     public void testLargeBatchWithProtoV2() throws Exception
     {
         createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");