You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/13 20:48:54 UTC
[03/13] cassandra git commit: Make sure client gets tombstone
overwhelmed warning
Make sure client gets tombstone overwhelmed warning
patch by Carl Yeksigian; reviewed by Josh McKenzie for CASSANDRA-9465
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbf6e62c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbf6e62c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbf6e62c
Branch: refs/heads/cassandra-3.3
Commit: dbf6e62c382d62f9c1727ecf5afb90d131a81775
Parents: 582bdba
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jan 13 13:22:36 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jan 13 14:35:59 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../AbstractLocalAwareExecutorService.java | 229 +++++++++++++++++++
.../AbstractTracingAwareExecutorService.java | 229 -------------------
.../DebuggableThreadPoolExecutor.java | 48 ++--
.../cassandra/concurrent/ExecutorLocal.java | 44 ++++
.../cassandra/concurrent/ExecutorLocals.java | 84 +++++++
.../concurrent/LocalAwareExecutorService.java | 34 +++
.../cassandra/concurrent/SEPExecutor.java | 3 +-
.../concurrent/SharedExecutorPool.java | 2 +-
.../cassandra/concurrent/StageManager.java | 12 +-
.../concurrent/TracingAwareExecutorService.java | 36 ---
.../cql3/statements/BatchStatement.java | 11 +-
.../cql3/statements/SelectStatement.java | 4 +-
.../cassandra/db/filter/SliceQueryFilter.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 8 +-
.../apache/cassandra/service/ClientWarn.java | 62 +++--
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../org/apache/cassandra/tracing/Tracing.java | 3 +-
.../org/apache/cassandra/transport/Message.java | 6 +-
.../transport/RequestThreadPoolExecutor.java | 4 +-
.../cassandra/service/ClientWarningsTest.java | 43 ++++
21 files changed, 529 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11f2529..6530956 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465)
* Fix error streaming section more than 2GB (CASSANDRA-10961)
* (cqlsh) Also apply --connect-timeout to control connection
timeout (CASSANDRA-10959)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
new file mode 100644
index 0000000..088b43e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.tracing.Tracing.isTracing;
+
+public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class);
+
+ protected abstract void addTask(FutureTask<?> futureTask);
+ protected abstract void onCompletion();
+
+ /** Task Submission / Creation / Objects **/
+
+ public <T> FutureTask<T> submit(Callable<T> task)
+ {
+ return submit(newTaskFor(task));
+ }
+
+ public FutureTask<?> submit(Runnable task)
+ {
+ return submit(newTaskFor(task, null));
+ }
+
+ public <T> FutureTask<T> submit(Runnable task, T result)
+ {
+ return submit(newTaskFor(task, result));
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+ {
+ return newTaskFor(runnable, result, ExecutorLocals.create());
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals)
+ {
+ if (locals != null)
+ {
+ if (runnable instanceof LocalSessionFutureTask)
+ return (LocalSessionFutureTask<T>) runnable;
+ return new LocalSessionFutureTask<T>(runnable, result, locals);
+ }
+ if (runnable instanceof FutureTask)
+ return (FutureTask<T>) runnable;
+ return new FutureTask<>(runnable, result);
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+ {
+ if (isTracing())
+ {
+ if (callable instanceof LocalSessionFutureTask)
+ return (LocalSessionFutureTask<T>) callable;
+ return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create());
+ }
+ if (callable instanceof FutureTask)
+ return (FutureTask<T>) callable;
+ return new FutureTask<>(callable);
+ }
+
+ private class LocalSessionFutureTask<T> extends FutureTask<T>
+ {
+ private final ExecutorLocals locals;
+
+ public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals)
+ {
+ super(callable);
+ this.locals = locals;
+ }
+
+ public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals)
+ {
+ super(runnable, result);
+ this.locals = locals;
+ }
+
+ public void run()
+ {
+ ExecutorLocals old = ExecutorLocals.create();
+ ExecutorLocals.set(locals);
+ try
+ {
+ super.run();
+ }
+ finally
+ {
+ ExecutorLocals.set(old);
+ }
+ }
+ }
+
+ class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+ {
+ private boolean failure;
+ private Object result = this;
+ private final Callable<T> callable;
+
+ public FutureTask(Callable<T> callable)
+ {
+ this.callable = callable;
+ }
+ public FutureTask(Runnable runnable, T result)
+ {
+ this(Executors.callable(runnable, result));
+ }
+
+ public void run()
+ {
+ try
+ {
+ result = callable.call();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+ result = t;
+ failure = true;
+ }
+ finally
+ {
+ signalAll();
+ onCompletion();
+ }
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return isSignaled();
+ }
+
+ public T get() throws InterruptedException, ExecutionException
+ {
+ await();
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ await(timeout, unit);
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+ }
+
+ private <T> FutureTask<T> submit(FutureTask<T> task)
+ {
+ addTask(task);
+ return task;
+ }
+
+ public void execute(Runnable command)
+ {
+ addTask(newTaskFor(command, ExecutorLocals.create()));
+ }
+
+ public void execute(Runnable command, ExecutorLocals locals)
+ {
+ addTask(newTaskFor(command, null, locals));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
deleted file mode 100644
index fb753b0..0000000
--- a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-import static org.apache.cassandra.tracing.Tracing.isTracing;
-
-public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService
-{
- private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class);
-
- protected abstract void addTask(FutureTask<?> futureTask);
- protected abstract void onCompletion();
-
- /** Task Submission / Creation / Objects **/
-
- public <T> FutureTask<T> submit(Callable<T> task)
- {
- return submit(newTaskFor(task));
- }
-
- public FutureTask<?> submit(Runnable task)
- {
- return submit(newTaskFor(task, null));
- }
-
- public <T> FutureTask<T> submit(Runnable task, T result)
- {
- return submit(newTaskFor(task, result));
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- {
- throw new UnsupportedOperationException();
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
- {
- throw new UnsupportedOperationException();
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
- {
- throw new UnsupportedOperationException();
- }
-
- protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
- {
- return newTaskFor(runnable, result, Tracing.instance.get());
- }
-
- protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState)
- {
- if (traceState != null)
- {
- if (runnable instanceof TraceSessionFutureTask)
- return (TraceSessionFutureTask<T>) runnable;
- return new TraceSessionFutureTask<T>(runnable, result, traceState);
- }
- if (runnable instanceof FutureTask)
- return (FutureTask<T>) runnable;
- return new FutureTask<>(runnable, result);
- }
-
- protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
- {
- if (isTracing())
- {
- if (callable instanceof TraceSessionFutureTask)
- return (TraceSessionFutureTask<T>) callable;
- return new TraceSessionFutureTask<T>(callable, Tracing.instance.get());
- }
- if (callable instanceof FutureTask)
- return (FutureTask<T>) callable;
- return new FutureTask<>(callable);
- }
-
- private class TraceSessionFutureTask<T> extends FutureTask<T>
- {
- private final TraceState state;
-
- public TraceSessionFutureTask(Callable<T> callable, TraceState state)
- {
- super(callable);
- this.state = state;
- }
-
- public TraceSessionFutureTask(Runnable runnable, T result, TraceState state)
- {
- super(runnable, result);
- this.state = state;
- }
-
- public void run()
- {
- TraceState oldState = Tracing.instance.get();
- Tracing.instance.set(state);
- try
- {
- super.run();
- }
- finally
- {
- Tracing.instance.set(oldState);
- }
- }
- }
-
- class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
- {
- private boolean failure;
- private Object result = this;
- private final Callable<T> callable;
-
- public FutureTask(Callable<T> callable)
- {
- this.callable = callable;
- }
- public FutureTask(Runnable runnable, T result)
- {
- this(Executors.callable(runnable, result));
- }
-
- public void run()
- {
- try
- {
- result = callable.call();
- }
- catch (Throwable t)
- {
- JVMStabilityInspector.inspectThrowable(t);
- logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
- result = t;
- failure = true;
- }
- finally
- {
- signalAll();
- onCompletion();
- }
- }
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return false;
- }
-
- public boolean isCancelled()
- {
- return false;
- }
-
- public boolean isDone()
- {
- return isSignaled();
- }
-
- public T get() throws InterruptedException, ExecutionException
- {
- await();
- Object result = this.result;
- if (failure)
- throw new ExecutionException((Throwable) result);
- return (T) result;
- }
-
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
- {
- await(timeout, unit);
- Object result = this.result;
- if (failure)
- throw new ExecutionException((Throwable) result);
- return (T) result;
- }
- }
-
- private <T> FutureTask<T> submit(FutureTask<T> task)
- {
- addTask(task);
- return task;
- }
-
- public void execute(Runnable command)
- {
- addTask(newTaskFor(command, null));
- }
-
- public void execute(Runnable command, TraceState state)
- {
- addTask(newTaskFor(command, null, state));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index a6d0049..1fb0690 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled.
*/
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
{
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -146,11 +146,11 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
- public void execute(Runnable command, TraceState state)
+ public void execute(Runnable command, ExecutorLocals locals)
{
- super.execute(state == null || command instanceof TraceSessionWrapper
+ super.execute(locals == null || command instanceof LocalSessionWrapper
? command
- : new TraceSessionWrapper<Object>(command, state));
+ : new LocalSessionWrapper<Object>(command, locals));
}
public void maybeExecuteImmediately(Runnable command)
@@ -162,17 +162,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
@Override
public void execute(Runnable command)
{
- super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
- ? new TraceSessionWrapper<Object>(Executors.callable(command, null))
+ super.execute(isTracing() && !(command instanceof LocalSessionWrapper)
+ ? new LocalSessionWrapper<Object>(Executors.callable(command, null))
: command);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result)
{
- if (isTracing() && !(runnable instanceof TraceSessionWrapper))
+ if (isTracing() && !(runnable instanceof LocalSessionWrapper))
{
- return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
+ return new LocalSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
@@ -180,9 +180,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
- if (isTracing() && !(callable instanceof TraceSessionWrapper))
+ if (isTracing() && !(callable instanceof LocalSessionWrapper))
{
- return new TraceSessionWrapper<T>(callable);
+ return new LocalSessionWrapper<T>(callable);
}
return super.newTaskFor(callable);
}
@@ -198,9 +198,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
protected static void maybeResetTraceSessionWrapper(Runnable r)
{
- if (r instanceof TraceSessionWrapper)
+ if (r instanceof LocalSessionWrapper)
{
- TraceSessionWrapper tsw = (TraceSessionWrapper) r;
+ LocalSessionWrapper tsw = (LocalSessionWrapper) r;
// we have to reset trace state as its presence is what denotes the current thread is tracing
// and if left this thread might start tracing unrelated tasks
tsw.reset();
@@ -210,8 +210,8 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
@Override
protected void beforeExecute(Thread t, Runnable r)
{
- if (r instanceof TraceSessionWrapper)
- ((TraceSessionWrapper) r).setupContext();
+ if (r instanceof LocalSessionWrapper)
+ ((LocalSessionWrapper) r).setupContext();
super.beforeExecute(t, r);
}
@@ -278,35 +278,35 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
- * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the TraceSessionContext and move
- * it into the worker thread.
+ * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the ExecutorLocals and move
+ * them into the worker thread.
*
* @param <T>
*/
- private static class TraceSessionWrapper<T> extends FutureTask<T>
+ private static class LocalSessionWrapper<T> extends FutureTask<T>
{
- private final TraceState state;
+ private final ExecutorLocals locals;
- public TraceSessionWrapper(Callable<T> callable)
+ public LocalSessionWrapper(Callable<T> callable)
{
super(callable);
- state = Tracing.instance.get();
+ locals = ExecutorLocals.create();
}
- public TraceSessionWrapper(Runnable command, TraceState state)
+ public LocalSessionWrapper(Runnable command, ExecutorLocals locals)
{
super(command, null);
- this.state = state;
+ this.locals = locals;
}
private void setupContext()
{
- Tracing.instance.set(state);
+ ExecutorLocals.set(locals);
}
private void reset()
{
- Tracing.instance.set(null);
+ ExecutorLocals.set(null);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
new file mode 100644
index 0000000..47826f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+
+public interface ExecutorLocal<T>
+{
+ ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance };
+
+ /**
+ * This is called when scheduling the task, and also before calling {@link ExecutorLocal#set(T)} when running on a
+ * executor thread.
+ *
+ * @return The thread-local value that we want to copy across executor boundaries; may be null if not set.
+ */
+ T get();
+
+ /**
+ * Before a task has been run, this will be called with the value from the thread that scheduled the task, and after
+ * the task is finished, the value that was previously retrieved from this thread is restored.
+ *
+ * @param value Value to use for the executor local state; may be null.
+ */
+ void set(T value);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
new file mode 100644
index 0000000..8e6d6ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+
+/*
+ * This class only knows about Tracing and ClientWarn, so if any different executor locals are added, it must be
+ * updated.
+ *
+ * We don't enumerate the ExecutorLocal.all array each time because it would be much slower.
+ */
+public class ExecutorLocals
+{
+ private static final ExecutorLocal<TraceState> tracing = Tracing.instance;
+ private static final ExecutorLocal<ClientWarn.State> clientWarn = ClientWarn.instance;
+
+ public final TraceState traceState;
+ public final ClientWarn.State clientWarnState;
+
+ private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState)
+ {
+ this.traceState = traceState;
+ this.clientWarnState = clientWarnState;
+ }
+
+ static
+ {
+ assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn })
+ : "ExecutorLocals has not been updated to reflect new ExecutorLocal.all";
+ }
+
+ /**
+ * This creates a new ExecutorLocals object based on what is already set.
+ *
+ * @return an ExecutorLocals object which has the trace state and client warn state captured if either has been set,
+ * or null if both are unset. The null result short-circuits logic in
+ * {@link AbstractLocalAwareExecutorService#newTaskFor(Runnable, Object, ExecutorLocals)}, preventing
+ * unnecessarily calling {@link ExecutorLocals#set(ExecutorLocals)}.
+ */
+ public static ExecutorLocals create()
+ {
+ TraceState traceState = tracing.get();
+ ClientWarn.State clientWarnState = clientWarn.get();
+ if (traceState == null && clientWarnState == null)
+ return null;
+ else
+ return new ExecutorLocals(traceState, clientWarnState);
+ }
+
+ public static ExecutorLocals create(TraceState traceState)
+ {
+ ClientWarn.State clientWarnState = clientWarn.get();
+ return new ExecutorLocals(traceState, clientWarnState);
+ }
+
+ public static void set(ExecutorLocals locals)
+ {
+ TraceState traceState = locals == null ? null : locals.traceState;
+ ClientWarn.State clientWarnState = locals == null ? null : locals.clientWarnState;
+ tracing.set(traceState);
+ clientWarn.set(clientWarnState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
new file mode 100644
index 0000000..5577d59
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+public interface LocalAwareExecutorService extends ExecutorService
+{
+ // we need a way to inject a TraceState directly into the Executor context without going through
+ // the global Tracing sessions; see CASSANDRA-5668
+ public void execute(Runnable command, ExecutorLocals locals);
+
+ // permits executing in the context of the submitting thread
+ public void maybeExecuteImmediately(Runnable command);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index d9a0fa8..8b12b82 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.metrics.SEPMetrics;
@@ -30,7 +29,7 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
import static org.apache.cassandra.concurrent.SEPWorker.Work;
-public class SEPExecutor extends AbstractTracingAwareExecutorService
+public class SEPExecutor extends AbstractLocalAwareExecutorService
{
private final SharedExecutorPool pool;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 8c18c44..dfd7011 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -103,7 +103,7 @@ public class SharedExecutorPool
schedule(Work.SPINNING);
}
- public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
+ public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
{
SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name);
executors.add(executor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 4f03fd5..114795e 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -39,7 +39,7 @@ public class StageManager
{
private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
- private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
+ private static final EnumMap<Stage, LocalAwareExecutorService> stages = new EnumMap<Stage, LocalAwareExecutorService>(Stage.class);
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
@@ -87,7 +87,7 @@ public class StageManager
stage.getJmxType());
}
- private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
+ private static LocalAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
{
return SharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxType(), stage.getJmxName());
}
@@ -96,7 +96,7 @@ public class StageManager
* Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved.
*/
- public static TracingAwareExecutorService getStage(Stage stage)
+ public static LocalAwareExecutorService getStage(Stage stage)
{
return stages.get(stage);
}
@@ -116,16 +116,16 @@ public class StageManager
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See CASSANDRA-1123 for background.
*/
- private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
+ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
{
public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
- public void execute(Runnable command, TraceState state)
+ public void execute(Runnable command, ExecutorLocals locals)
{
- assert state == null;
+ assert locals == null;
super.execute(command);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
deleted file mode 100644
index f580fea..0000000
--- a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.tracing.TraceState;
-
-public interface TracingAwareExecutorService extends ExecutorService
-{
- // we need a way to inject a TraceState directly into the Executor context without going through
- // the global Tracing sessions; see CASSANDRA-5668
- public void execute(Runnable command, TraceState state);
-
- // permits executing in the context of the submitting thread
- public void maybeExecuteImmediately(Runnable command);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 43a80bb..a289ad1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -273,7 +273,7 @@ public class BatchStatement implements CQLStatement
{
logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
}
- ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {ksCfPairs, size, warnThreshold, size - warnThreshold, ""}).getMessage());
+ ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage());
}
}
@@ -305,8 +305,13 @@ public class BatchStatement implements CQLStatement
keySet.size(), keySet.size() == 1 ? "" : "s",
ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
- ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
- ksCfPairs.size() == 1 ? "" : "s", ksCfPairs}).getMessage());
+ ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning,
+ new Object[]{
+ keySet.size(),
+ keySet.size() == 1 ? "" : "s",
+ ksCfPairs.size() == 1 ? "" : "s",
+ ksCfPairs
+ }).getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5f142ce..5cfa94b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -251,12 +251,12 @@ public class SelectStatement implements CQLStatement
if (!restrictions.hasPartitionKeyRestrictions())
{
logger.warn("Aggregation query used without partition key");
- ClientWarn.warn("Aggregation query used without partition key");
+ ClientWarn.instance.warn("Aggregation query used without partition key");
}
else if (restrictions.keyIsInRelation())
{
logger.warn("Aggregation query used on multiple partition keys (IN restriction)");
- ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)");
+ ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)");
}
Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index e1a68e7..d2f0bf4 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -303,7 +303,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
container.metadata().getKeyValidator().getString(key.getKey()),
count,
getSlicesInfo(container));
- ClientWarn.warn(msg);
+ ClientWarn.instance.warn(msg);
logger.warn(msg);
}
Tracing.trace("Read {} live and {} tombstone cells{}",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 61e58c2..459923b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -43,10 +43,12 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ExecutorLocal;
+import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.TracingAwareExecutorService;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
@@ -801,10 +803,10 @@ public final class MessagingService implements MessagingServiceMBean
return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
- TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
+ LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
- stage.execute(runnable, state);
+ stage.execute(runnable, ExecutorLocals.create(state));
}
public void setCallbackForTests(int messageId, CallbackInfo callback)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/ClientWarn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
index 2ed0a6c..ddad197 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -20,54 +20,68 @@ package org.apache.cassandra.service;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.concurrent.ExecutorLocal;
import org.apache.cassandra.utils.FBUtilities;
-public class ClientWarn
+public class ClientWarn implements ExecutorLocal<ClientWarn.State>
{
private static final String TRUNCATED = " [truncated]";
- private static final ThreadLocal<ClientWarn> warnLocal = new ThreadLocal<>();
-
- private final List<String> warnings = new ArrayList<>();
+ private static final ThreadLocal<ClientWarn.State> warnLocal = new ThreadLocal<>();
+ public static ClientWarn instance = new ClientWarn();
private ClientWarn()
{
}
- public static void warn(String text)
- {
- ClientWarn warner = warnLocal.get();
- if (warner != null)
- warner.add(text);
+ public State get() {
+ return warnLocal.get();
}
- private void add(String warning)
- {
- if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
- warnings.add(maybeTruncate(warning));
+ public void set(State value) {
+ warnLocal.set(value);
}
- private static String maybeTruncate(String warning)
+ public void warn(String text)
{
- return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
- ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
- : warning;
+ State state = warnLocal.get();
+ if (state != null)
+ state.add(text);
}
- public static void captureWarnings()
+ public void captureWarnings()
{
- warnLocal.set(new ClientWarn());
+ warnLocal.set(new State());
}
- public static List<String> getWarnings()
+ public List<String> getWarnings()
{
- ClientWarn warner = warnLocal.get();
- if (warner == null || warner.warnings.isEmpty())
+ State state = warnLocal.get();
+ if (state == null || state.warnings.isEmpty())
return null;
- return warner.warnings;
+ return state.warnings;
}
- public static void resetWarnings()
+ public void resetWarnings()
{
warnLocal.remove();
}
+
+ public static class State
+ {
+ private final List<String> warnings = new ArrayList<>();
+
+ private void add(String warning)
+ {
+ if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
+ warnings.add(maybeTruncate(warning));
+ }
+
+ private static String maybeTruncate(String warning)
+ {
+ return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
+ ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
+ : warning;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 88253e3..841e980 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1842,7 +1842,7 @@ public class StorageProxy implements StorageProxyMBean
if (filteredEndpoints.size() == 1
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
{
- StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
+ StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index ccc2637..bf9cee7 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ExecutorLocal;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
@@ -41,7 +42,7 @@ import org.apache.cassandra.utils.UUIDGen;
* A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may
* have multiple local and remote events before it is completed. All events and sessions are stored at keyspace.
*/
-public class Tracing
+public class Tracing implements ExecutorLocal<TraceState>
{
public static final String TRACE_HEADER = "TraceSession";
public static final String TRACE_TYPE = "TraceType";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index ab794df..01a0794 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -499,14 +499,14 @@ public abstract class Message
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection)request.connection();
if (connection.getVersion() >= Server.VERSION_4)
- ClientWarn.captureWarnings();
+ ClientWarn.instance.captureWarnings();
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
logger.trace("Received: {}, v={}", request, connection.getVersion());
response = request.execute(qstate);
response.setStreamId(request.getStreamId());
- response.setWarnings(ClientWarn.getWarnings());
+ response.setWarnings(ClientWarn.instance.getWarnings());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);
}
@@ -519,7 +519,7 @@ public abstract class Message
}
finally
{
- ClientWarn.resetWarnings();
+ ClientWarn.instance.resetWarnings();
}
logger.trace("Responding: {}, v={}", response, connection.getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
index 4ecd6a7..289f3e3 100644
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.TracingAwareExecutorService;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
@@ -32,7 +32,7 @@ public class RequestThreadPoolExecutor extends AbstractEventExecutor
{
private final static int MAX_QUEUED_REQUESTS = 128;
private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
- private final TracingAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+ private final LocalAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
MAX_QUEUED_REQUESTS,
"transport",
THREAD_FACTORY_ID);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index cfd5f7a..d22a8f6 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -24,6 +24,8 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.Server;
@@ -79,6 +81,47 @@ public class ClientWarningsTest extends CQLTester
}
@Test
+ public void testTombstoneWarning() throws Exception
+ {
+ final int iterations = 10000;
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
+ {
+ client.connect(false);
+
+ for (int i = 0; i < iterations; i++)
+ {
+ QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)",
+ KEYSPACE,
+ currentTable(),
+ i), QueryOptions.DEFAULT);
+ client.execute(query);
+ }
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+ store.forceBlockingFlush();
+
+ for (int i = 0; i < iterations; i++)
+ {
+ QueryMessage query = new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s",
+ KEYSPACE,
+ currentTable(),
+ i), QueryOptions.DEFAULT);
+ client.execute(query);
+ }
+ store.forceBlockingFlush();
+
+ {
+ QueryMessage query = new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1",
+ KEYSPACE,
+ currentTable()), QueryOptions.DEFAULT);
+ Message.Response resp = client.execute(query);
+ assertEquals(1, resp.getWarnings().size());
+ }
+ }
+ }
+
+ @Test
public void testLargeBatchWithProtoV2() throws Exception
{
createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");