You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/25 17:05:38 UTC

git commit: add OTC tracing support for replica nodes too

Updated Branches:
  refs/heads/trunk 6adf52c94 -> 10777f2cc


add OTC tracing support for replica nodes too


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10777f2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10777f2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10777f2c

Branch: refs/heads/trunk
Commit: 10777f2cc8d0ab51fd760427a6f11a1d41aee596
Parents: 6adf52c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 25 10:05:35 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 25 10:05:35 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadVerbHandler.java   |    1 +
 .../cassandra/net/OutboundTcpConnection.java       |    5 ++-
 .../cassandra/service/RangeSliceVerbHandler.java   |    1 +
 .../org/apache/cassandra/tracing/TraceState.java   |    4 +-
 src/java/org/apache/cassandra/tracing/Tracing.java |   38 +++++++++++---
 5 files changed, 39 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index c0814a1..d4af0be 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ReadVerbHandler implements IVerbHandler<ReadCommand>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1093f70..f79e4a0 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -170,8 +171,10 @@ public class OutboundTcpConnection extends Thread
             byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER);
             if (sessionBytes != null)
             {
-                Tracing.instance().continueExistingSession(UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)));
+                UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
+                Tracing.instance().continueExistingSession(sessionId);
                 logger.debug("Sending message to {}", poolReference.endPoint());
+                Tracing.instance().maybeStopNonlocalSession(sessionId);
             }
 
             write(qm.message, qm.id, qm.timestamp, out, targetVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index fd26760..8727005 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
 
 public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index ae0fa20..e305471 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -36,14 +36,16 @@ public class TraceState
     public final InetAddress coordinator;
     public final Stopwatch watch;
     public final ByteBuffer sessionIdBytes;
+    public final boolean isLocallyOwned;
 
-    public TraceState(InetAddress coordinator, UUID sessionId)
+    public TraceState(InetAddress coordinator, UUID sessionId, boolean locallyOwned)
     {
         assert coordinator != null;
         assert sessionId != null;
 
         this.coordinator = coordinator;
         this.sessionId = sessionId;
+        this.isLocallyOwned = locallyOwned;
         sessionIdBytes = ByteBufferUtil.bytes(sessionId);
         watch = new Stopwatch();
         watch.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index df49960..f2f8869 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -80,8 +80,7 @@ public class Tracing
 
     private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>();
 
-    /** sessions that were initiated on this node */
-    private final Map<UUID, TraceState> initiatedSessions = new ConcurrentHashMap<UUID, TraceState>();
+    private final Map<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, TraceState>();
 
     public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value)
     {
@@ -153,13 +152,30 @@ public class Tracing
     {
         assert state.get() == null;
 
-        TraceState ts = new TraceState(localAddress, sessionId);
+        TraceState ts = new TraceState(localAddress, sessionId, true);
         state.set(ts);
-        initiatedSessions.put(sessionId, ts);
+        sessions.put(sessionId, ts);
 
         return sessionId;
     }
 
+    /**
+     * Removes the state data but does not log it as complete.
+     * For use by replica nodes, after replying to the master.
+     *
+     * Note: checking that the session exists is the job of the caller.
+     */
+    public void maybeStopNonlocalSession(UUID sessionId)
+    {
+        TraceState state = sessions.get(sessionId);
+        assert state != null;
+        if (!state.isLocallyOwned)
+            sessions.remove(state.sessionId);
+    }
+
+    /**
+     * Stop the session and record its complete.  Called by coodinator when request is complete.
+     */
     public void stopSession()
     {
         TraceState state = this.state.get();
@@ -186,7 +202,7 @@ public class Tracing
                 }
             });
 
-            initiatedSessions.remove(state.sessionId);
+            sessions.remove(state.sessionId);
             this.state.set(null);
         }
     }
@@ -249,14 +265,20 @@ public class Tracing
 
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-        TraceState ts = initiatedSessions.get(sessionId);
+        TraceState ts = sessions.get(sessionId);
         if (ts == null)
-            ts = new TraceState(message.from, sessionId);
+        {
+            ts = new TraceState(message.from, sessionId, false);
+            sessions.put(sessionId, ts);
+        }
         state.set(ts);
     }
 
+    /**
+     * Activate @param sessionId representing a session we've already seen
+     */
     public void continueExistingSession(UUID sessionId)
     {
-        state.set(initiatedSessions.get(sessionId));
+        state.set(sessions.get(sessionId));
     }
 }