You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/25 17:05:38 UTC
git commit: add OTC tracing support for replica nodes too
Updated Branches:
refs/heads/trunk 6adf52c94 -> 10777f2cc
add OTC tracing support for replica nodes too
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10777f2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10777f2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10777f2c
Branch: refs/heads/trunk
Commit: 10777f2cc8d0ab51fd760427a6f11a1d41aee596
Parents: 6adf52c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 25 10:05:35 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 25 10:05:35 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/db/ReadVerbHandler.java | 1 +
.../cassandra/net/OutboundTcpConnection.java | 5 ++-
.../cassandra/service/RangeSliceVerbHandler.java | 1 +
.../org/apache/cassandra/tracing/TraceState.java | 4 +-
src/java/org/apache/cassandra/tracing/Tracing.java | 38 +++++++++++---
5 files changed, 39 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index c0814a1..d4af0be 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ReadVerbHandler implements IVerbHandler<ReadCommand>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1093f70..f79e4a0 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -170,8 +171,10 @@ public class OutboundTcpConnection extends Thread
byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER);
if (sessionBytes != null)
{
- Tracing.instance().continueExistingSession(UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)));
+ UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
+ Tracing.instance().continueExistingSession(sessionId);
logger.debug("Sending message to {}", poolReference.endPoint());
+ Tracing.instance().maybeStopNonlocalSession(sessionId);
}
write(qm.message, qm.id, qm.timestamp, out, targetVersion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index fd26760..8727005 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index ae0fa20..e305471 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -36,14 +36,16 @@ public class TraceState
public final InetAddress coordinator;
public final Stopwatch watch;
public final ByteBuffer sessionIdBytes;
+ public final boolean isLocallyOwned;
- public TraceState(InetAddress coordinator, UUID sessionId)
+ public TraceState(InetAddress coordinator, UUID sessionId, boolean locallyOwned)
{
assert coordinator != null;
assert sessionId != null;
this.coordinator = coordinator;
this.sessionId = sessionId;
+ this.isLocallyOwned = locallyOwned;
sessionIdBytes = ByteBufferUtil.bytes(sessionId);
watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10777f2c/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index df49960..f2f8869 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -80,8 +80,7 @@ public class Tracing
private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>();
- /** sessions that were initiated on this node */
- private final Map<UUID, TraceState> initiatedSessions = new ConcurrentHashMap<UUID, TraceState>();
+ private final Map<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, TraceState>();
public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value)
{
@@ -153,13 +152,30 @@ public class Tracing
{
assert state.get() == null;
- TraceState ts = new TraceState(localAddress, sessionId);
+ TraceState ts = new TraceState(localAddress, sessionId, true);
state.set(ts);
- initiatedSessions.put(sessionId, ts);
+ sessions.put(sessionId, ts);
return sessionId;
}
+ /**
+ * Removes the state data but does not log it as complete.
+ * For use by replica nodes, after replying to the master.
+ *
+ * Note: checking that the session exists is the job of the caller.
+ */
+ public void maybeStopNonlocalSession(UUID sessionId)
+ {
+ TraceState state = sessions.get(sessionId);
+ assert state != null;
+ if (!state.isLocallyOwned)
+ sessions.remove(state.sessionId);
+ }
+
+ /**
+ * Stop the session and record its complete. Called by coodinator when request is complete.
+ */
public void stopSession()
{
TraceState state = this.state.get();
@@ -186,7 +202,7 @@ public class Tracing
}
});
- initiatedSessions.remove(state.sessionId);
+ sessions.remove(state.sessionId);
this.state.set(null);
}
}
@@ -249,14 +265,20 @@ public class Tracing
assert sessionBytes.length == 16;
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
- TraceState ts = initiatedSessions.get(sessionId);
+ TraceState ts = sessions.get(sessionId);
if (ts == null)
- ts = new TraceState(message.from, sessionId);
+ {
+ ts = new TraceState(message.from, sessionId, false);
+ sessions.put(sessionId, ts);
+ }
state.set(ts);
}
+ /**
+ * Activate @param sessionId representing a session we've already seen
+ */
public void continueExistingSession(UUID sessionId)
{
- state.set(initiatedSessions.get(sessionId));
+ state.set(sessions.get(sessionId));
}
}