You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 16:27:51 UTC

[1/3] git commit: Fix tracing when operation completesbefore all responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668

Updated Branches:
  refs/heads/cassandra-1.2 7dc2eb95c -> fbe8a6eb2
  refs/heads/trunk b73f9d423 -> 140b0311d


Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb

Branch: refs/heads/cassandra-1.2
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DebuggableThreadPoolExecutor.java           | 22 ++++++++---
 .../cassandra/concurrent/StageManager.java      | 29 +++++++++------
 .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  | 14 ++++---
 .../cassandra/net/OutboundTcpConnection.java    | 13 ++++++-
 .../cassandra/service/MigrationManager.java     |  7 +---
 .../cassandra/tracing/ExpiredTraceState.java    | 39 ++++++++++++++++++++
 .../apache/cassandra/tracing/TraceState.java    | 12 ++++--
 .../org/apache/cassandra/tracing/Tracing.java   | 26 +++++++------
 .../service/AntiEntropyServiceTestAbstract.java | 10 ++---
 11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
  * Fix cross-DC mutation forwarding (CASSANDRA-5632)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
  *   threads and the queue is full, we want the enqueuer to block.  But to allow the number of threads to drop if a
  *   stage is less busy, core thread timeout is enabled.
  */
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
 {
     protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
     public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}
 
+    public void execute(Runnable command, TraceState state)
+    {
+        super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state));
+    }
+
     // execute does not call newTaskFor
     @Override
     public void execute(Runnable command)
     {
         super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
-                      ? new TraceSessionWrapper<Object>(command, null)
+                      ? new TraceSessionWrapper<Object>(command)
                       : command);
     }
 
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         if (isTracing() && !(runnable instanceof TraceSessionWrapper))
         {
-            return new TraceSessionWrapper<T>(runnable, result);
+            return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
         }
         return super.newTaskFor(runnable, result);
     }
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         private final TraceState state;
 
-        public TraceSessionWrapper(Runnable runnable, T result)
+        public TraceSessionWrapper(Runnable command)
         {
-            super(runnable, result);
-            state = Tracing.instance().get();
+            this(command, null);
         }
 
         public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
             state = Tracing.instance().get();
         }
 
+        public TraceSessionWrapper(Runnable command, TraceState state)
+        {
+            super(command, null);
+            this.state = state;
+        }
+
         private void setupContext()
         {
             Tracing.instance().set(state);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
 {
     private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
 
-    private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+    private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
 
@@ -60,7 +61,7 @@ public class StageManager
         stages.put(Stage.TRACING, tracingExecutor());
     }
 
-    private static ThreadPoolExecutor tracingExecutor()
+    private static ExecuteOnlyExecutor tracingExecutor()
     {
         RejectedExecutionHandler reh = new RejectedExecutionHandler()
         {
@@ -78,7 +79,7 @@ public class StageManager
                                        reh);
     }
 
-    private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+    private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
     {
         return new JMXEnabledThreadPoolExecutor(numThreads,
                                                 KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
                                                 stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.
-    */
-    public static ThreadPoolExecutor getStage(Stage stage)
+     */
+    public static TracingAwareExecutorService getStage(Stage stage)
     {
         return stages.get(stage);
     }
@@ -132,29 +133,35 @@ public class StageManager
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
      * tracing stage.  See CASSANDRA-1123 for background.
      */
-    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
     {
         public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
         {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
         }
 
+        public void execute(Runnable command, TraceState state)
+        {
+            assert state == null;
+            super.execute(command);
+        }
+
         @Override
         public Future<?> submit(Runnable task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Runnable task, T result)
         {
-            return super.submit(task, result);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Callable<T> task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+    // we need a way to inject a TraceState directly into the Executor context without going through
+    // the global Tracing sessions; see CASSANDRA-5668
+    public void execute(Runnable command, TraceState state);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void receive(MessageIn message, String id, long timestamp)
     {
-        Tracing.instance().initializeFromMessage(message);
-        Tracing.trace("Message received from {}", message.from);
+        TraceState state = Tracing.instance().initializeFromMessage(message);
+        if (state != null)
+            state.trace("Message received from {}", message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
             return;
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
-        ExecutorService stage = StageManager.getStage(message.getMessageType());
+        TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
         if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
             }
         }
 
-        stage.execute(runnable);
+        stage.execute(runnable, state);
     }
 
     public void setCallbackForTests(String messageId, CallbackInfo callback)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
             {
                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance().get(sessionId);
-                state.trace("Sending message to {}", poolReference.endPoint());
-                Tracing.instance().stopIfNonLocal(state);
+                String message = String.format("Sending message to %s", poolReference.endPoint());
+                // session may have already finished; see CASSANDRA-5668
+                if (state == null)
+                {
+                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+                }
+                else
+                {
+                    state.trace(message);
+                    Tracing.instance().stopIfNonLocal(state);
+                }
             }
 
             write(qm.message, qm.id, qm.timestamp, out, targetVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     public static boolean isReadyForBootstrap()
     {
-        return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+        return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
     }
 
     public void notifyCreateKeyspace(KSMetaData ksm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+    public ExpiredTraceState(UUID sessionId)
+    {
+        super(FBUtilities.getBroadcastAddress(), sessionId, true);
+    }
+
+    public int elapsed()
+    {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
         trace(MessageFormatter.arrayFormat(format, args).getMessage());
     }
 
-    public void trace(final String message)
+    public void trace(String message)
     {
-        final int elapsed = elapsed();
-        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+        TraceState.trace(sessionIdBytes, message, elapsed());
+    }
 
+    public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
+    {
+        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
         final String threadName = Thread.currentThread().getName();
 
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
                 ColumnFamily cf = ColumnFamily.create(cfMeta);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
-                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
+                if (elapsed >= 0)
+                    Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
                 RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
                 mutation.add(cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
     }
 
     /**
-     * Updates the threads query context from a message
+     * Determines the tracing context from a message.  Does NOT set the threadlocal state.
      * 
-     * @param message
-     *            The internode message
+     * @param message The internode message
      */
-    public void initializeFromMessage(final MessageIn<?> message)
+    public TraceState initializeFromMessage(final MessageIn<?> message)
     {
         final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
 
-        // if the message has no session context header don't do tracing
         if (sessionBytes == null)
-        {
-            state.set(null);
-            return;
-        }
+            return null;
 
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = sessions.get(sessionId);
-        if (ts == null)
+        if (ts != null)
+            return ts;
+
+        if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+        {
+            // received a message for a session we've already closed out.  see CASSANDRA-5668
+            return new ExpiredTraceState(sessionId);
+        }
+        else
         {
             ts = new TraceState(message.from, sessionId, false);
             sessions.put(sessionId, ts);
+            return ts;
         }
-        state.set(ts);
     }
 
     public static void trace(String message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
+import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
     void flushAES() throws Exception
     {
-        final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+        final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
         final Callable noop = new Callable<Object>()
         {
             public Boolean call()


[3/3] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/140b0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/140b0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/140b0311

Branch: refs/heads/trunk
Commit: 140b0311df890d2258f19f3df98f1a996b6f2a6e
Parents: b73f9d4 fbe8a6e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 09:27:21 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:27:21 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DebuggableThreadPoolExecutor.java           | 22 ++++++++---
 .../cassandra/concurrent/StageManager.java      | 29 +++++++++------
 .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  | 14 ++++---
 .../cassandra/net/OutboundTcpConnection.java    | 13 ++++++-
 .../cassandra/service/MigrationManager.java     |  3 +-
 .../cassandra/tracing/ExpiredTraceState.java    | 39 ++++++++++++++++++++
 .../apache/cassandra/tracing/TraceState.java    | 12 ++++--
 .../org/apache/cassandra/tracing/Tracing.java   | 26 +++++++------
 .../service/AntiEntropyServiceTestAbstract.java |  9 +++--
 11 files changed, 156 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 044add4,593bf7c..29c6fd2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,69 -1,5 +1,70 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
 + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
 + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
 + * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
 + * Include a timestamp with all read commands to determine column expiration
 +   (CASSANDRA-5149)
 + * Streaming 2.0 (CASSANDRA-5286)
 + * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737)
 +
  1.2.6
+  * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
   * Fix cross-DC mutation forwarding (CASSANDRA-5632)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 8e94f8e,2964d35..e01183c
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -55,12 -60,11 +56,12 @@@ import org.apache.cassandra.metrics.Dro
  import org.apache.cassandra.net.sink.SinkManager;
  import org.apache.cassandra.security.SSLFactory;
  import org.apache.cassandra.service.*;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.PrepareResponse;
  import org.apache.cassandra.streaming.*;
 -import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+ import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.*;
- import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
  public final class MessagingService implements MessagingServiceMBean
  {
@@@ -677,23 -701,38 +678,24 @@@
          }
      }
  
 -    public void receive(MessageIn message, String id, long timestamp)
 +    public void receive(MessageIn message, int id, long timestamp)
      {
-         Tracing.instance().initializeFromMessage(message);
-         Tracing.trace("Message received from {}", message.from);
+         TraceState state = Tracing.instance().initializeFromMessage(message);
+         if (state != null)
+             state.trace("Message received from {}", message.from);
  
          message = SinkManager.processInboundMessage(message, id);
          if (message == null)
              return;
  
          Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
-         ExecutorService stage = StageManager.getStage(message.getMessageType());
+         TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
          assert stage != null : "No stage for message type " + message.verb;
  
-         stage.execute(runnable);
 -        if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
 -        {
 -            IMessageCallback cb = MessagingService.instance().getRegisteredCallback(id).callback;
 -
 -            if (cb instanceof AbstractWriteResponseHandler)
 -            {
 -                PBSPredictor.instance().logWriteResponse(id, timestamp);
 -            }
 -            else if (cb instanceof ReadCallback)
 -            {
 -                PBSPredictor.instance().logReadResponse(id, timestamp);
 -            }
 -        }
 -
+         stage.execute(runnable, state);
      }
  
 -    public void setCallbackForTests(String messageId, CallbackInfo callback)
 +    public void setCallbackForTests(int messageId, CallbackInfo callback)
      {
          callbacks.put(messageId, callback);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 52a415c,ee30d36..eaab3ad
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -182,15 -179,25 +182,24 @@@ public class OutboundTcpConnection exte
              {
                  UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                  TraceState state = Tracing.instance().get(sessionId);
-                 state.trace("Sending message to {}", poolReference.endPoint());
-                 Tracing.instance().stopIfNonLocal(state);
+                 String message = String.format("Sending message to %s", poolReference.endPoint());
+                 // session may have already finished; see CASSANDRA-5668
+                 if (state == null)
+                 {
+                     TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+                 }
+                 else
+                 {
+                     state.trace(message);
+                     Tracing.instance().stopIfNonLocal(state);
+                 }
              }
  
 -            write(qm.message, qm.id, qm.timestamp, out, targetVersion);
 +            writeInternal(qm.message, qm.id, qm.timestamp);
 +
              completed++;
              if (active.peek() == null)
 -            {
                  out.flush();
 -            }
          }
          catch (Exception e)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationManager.java
index d602d25,de34785..cce674f
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@@ -23,9 -23,7 +23,10 @@@ import java.io.IOException
  import java.net.InetAddress;
  import java.nio.ByteBuffer;
  import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.*;
  
  import java.lang.management.ManagementFactory;
  import java.lang.management.RuntimeMXBean;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 52e6d04,25599c4..b4cff93
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -91,12 -96,14 +94,13 @@@ public class TraceStat
              public void runMayThrow() throws Exception
              {
                  CFMetaData cfMeta = CFMetaData.TraceEventsCf;
 -                ColumnFamily cf = ColumnFamily.create(cfMeta);
 +                ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
 +                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
                  Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
-                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
 -                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
+                 if (elapsed >= 0)
+                     Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
 -                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
 -                RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
 -                mutation.add(cf);
 +                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
 +                RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf);
                  StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
              }
          });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/Tracing.java
index 5637f48,eb5bad9..8782ee5
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@@ -33,9 -33,13 +33,10 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.ColumnNameBuilder;
 -import org.apache.cassandra.db.ColumnFamily;
 -import org.apache.cassandra.db.ConsistencyLevel;
 -import org.apache.cassandra.db.ExpiringColumn;
 -import org.apache.cassandra.db.RowMutation;
 +import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.marshal.TimeUUIDType;
  import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageProxy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 79aed42,b80c272..c930cc3
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@@ -48,6 -48,7 +48,7 @@@ import org.apache.cassandra.utils.ByteB
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MerkleTree;
  
 -import static org.apache.cassandra.service.AntiEntropyService.*;
++import static org.apache.cassandra.service.ActiveRepairService.*;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
  


[2/3] git commit: Fix tracing when operation completesbefore all responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668

Posted by jb...@apache.org.
Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb

Branch: refs/heads/trunk
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DebuggableThreadPoolExecutor.java           | 22 ++++++++---
 .../cassandra/concurrent/StageManager.java      | 29 +++++++++------
 .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  | 14 ++++---
 .../cassandra/net/OutboundTcpConnection.java    | 13 ++++++-
 .../cassandra/service/MigrationManager.java     |  7 +---
 .../cassandra/tracing/ExpiredTraceState.java    | 39 ++++++++++++++++++++
 .../apache/cassandra/tracing/TraceState.java    | 12 ++++--
 .../org/apache/cassandra/tracing/Tracing.java   | 26 +++++++------
 .../service/AntiEntropyServiceTestAbstract.java | 10 ++---
 11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
  * Fix cross-DC mutation forwarding (CASSANDRA-5632)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
  *   threads and the queue is full, we want the enqueuer to block.  But to allow the number of threads to drop if a
  *   stage is less busy, core thread timeout is enabled.
  */
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
 {
     protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
     public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}
 
+    public void execute(Runnable command, TraceState state)
+    {
+        super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state));
+    }
+
     // execute does not call newTaskFor
     @Override
     public void execute(Runnable command)
     {
         super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
-                      ? new TraceSessionWrapper<Object>(command, null)
+                      ? new TraceSessionWrapper<Object>(command)
                       : command);
     }
 
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         if (isTracing() && !(runnable instanceof TraceSessionWrapper))
         {
-            return new TraceSessionWrapper<T>(runnable, result);
+            return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
         }
         return super.newTaskFor(runnable, result);
     }
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         private final TraceState state;
 
-        public TraceSessionWrapper(Runnable runnable, T result)
+        public TraceSessionWrapper(Runnable command)
         {
-            super(runnable, result);
-            state = Tracing.instance().get();
+            this(command, null);
         }
 
         public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
             state = Tracing.instance().get();
         }
 
+        public TraceSessionWrapper(Runnable command, TraceState state)
+        {
+            super(command, null);
+            this.state = state;
+        }
+
         private void setupContext()
         {
             Tracing.instance().set(state);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
 {
     private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
 
-    private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+    private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
 
@@ -60,7 +61,7 @@ public class StageManager
         stages.put(Stage.TRACING, tracingExecutor());
     }
 
-    private static ThreadPoolExecutor tracingExecutor()
+    private static ExecuteOnlyExecutor tracingExecutor()
     {
         RejectedExecutionHandler reh = new RejectedExecutionHandler()
         {
@@ -78,7 +79,7 @@ public class StageManager
                                        reh);
     }
 
-    private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+    private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
     {
         return new JMXEnabledThreadPoolExecutor(numThreads,
                                                 KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
                                                 stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.
-    */
-    public static ThreadPoolExecutor getStage(Stage stage)
+     */
+    public static TracingAwareExecutorService getStage(Stage stage)
     {
         return stages.get(stage);
     }
@@ -132,29 +133,35 @@ public class StageManager
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
      * tracing stage.  See CASSANDRA-1123 for background.
      */
-    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
     {
         public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
         {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
         }
 
+        public void execute(Runnable command, TraceState state)
+        {
+            assert state == null;
+            super.execute(command);
+        }
+
         @Override
         public Future<?> submit(Runnable task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Runnable task, T result)
         {
-            return super.submit(task, result);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Callable<T> task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+    // we need a way to inject a TraceState directly into the Executor context without going through
+    // the global Tracing sessions; see CASSANDRA-5668
+    public void execute(Runnable command, TraceState state);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void receive(MessageIn message, String id, long timestamp)
     {
-        Tracing.instance().initializeFromMessage(message);
-        Tracing.trace("Message received from {}", message.from);
+        TraceState state = Tracing.instance().initializeFromMessage(message);
+        if (state != null)
+            state.trace("Message received from {}", message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
             return;
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
-        ExecutorService stage = StageManager.getStage(message.getMessageType());
+        TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
         if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
             }
         }
 
-        stage.execute(runnable);
+        stage.execute(runnable, state);
     }
 
     public void setCallbackForTests(String messageId, CallbackInfo callback)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
             {
                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance().get(sessionId);
-                state.trace("Sending message to {}", poolReference.endPoint());
-                Tracing.instance().stopIfNonLocal(state);
+                String message = String.format("Sending message to %s", poolReference.endPoint());
+                // session may have already finished; see CASSANDRA-5668
+                if (state == null)
+                {
+                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+                }
+                else
+                {
+                    state.trace(message);
+                    Tracing.instance().stopIfNonLocal(state);
+                }
             }
 
             write(qm.message, qm.id, qm.timestamp, out, targetVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     public static boolean isReadyForBootstrap()
     {
-        return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+        return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
     }
 
     public void notifyCreateKeyspace(KSMetaData ksm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+    public ExpiredTraceState(UUID sessionId)
+    {
+        super(FBUtilities.getBroadcastAddress(), sessionId, true);
+    }
+
+    public int elapsed()
+    {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
         trace(MessageFormatter.arrayFormat(format, args).getMessage());
     }
 
-    public void trace(final String message)
+    public void trace(String message)
     {
-        final int elapsed = elapsed();
-        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+        TraceState.trace(sessionIdBytes, message, elapsed());
+    }
 
+    public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
+    {
+        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
         final String threadName = Thread.currentThread().getName();
 
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
                 ColumnFamily cf = ColumnFamily.create(cfMeta);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
-                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
+                if (elapsed >= 0)
+                    Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
                 RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
                 mutation.add(cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
     }
 
     /**
-     * Updates the threads query context from a message
+     * Determines the tracing context from a message.  Does NOT set the threadlocal state.
      * 
-     * @param message
-     *            The internode message
+     * @param message The internode message
      */
-    public void initializeFromMessage(final MessageIn<?> message)
+    public TraceState initializeFromMessage(final MessageIn<?> message)
     {
         final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
 
-        // if the message has no session context header don't do tracing
         if (sessionBytes == null)
-        {
-            state.set(null);
-            return;
-        }
+            return null;
 
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = sessions.get(sessionId);
-        if (ts == null)
+        if (ts != null)
+            return ts;
+
+        if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+        {
+            // received a message for a session we've already closed out.  see CASSANDRA-5668
+            return new ExpiredTraceState(sessionId);
+        }
+        else
         {
             ts = new TraceState(message.from, sessionId, false);
             sessions.put(sessionId, ts);
+            return ts;
         }
-        state.set(ts);
     }
 
     public static void trace(String message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
+import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
     void flushAES() throws Exception
     {
-        final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+        final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
         final Callable noop = new Callable<Object>()
         {
             public Boolean call()