You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/25 16:50:23 UTC

[2/2] git commit: add tracing to OutboundTcpConnection

add tracing to OutboundTcpConnection


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6adf52c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6adf52c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6adf52c9

Branch: refs/heads/trunk
Commit: 6adf52c94d5c819bd1b4c38be9cdc234d40aa1fd
Parents: ec9af99
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 24 21:16:29 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 25 09:50:19 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadVerbHandler.java   |    2 +-
 .../cassandra/db/RowMutationVerbHandler.java       |    2 +-
 .../apache/cassandra/db/TruncateVerbHandler.java   |    2 +-
 .../org/apache/cassandra/net/MessagingService.java |    2 +-
 .../cassandra/net/OutboundTcpConnection.java       |   18 +++++++++++---
 .../cassandra/service/IndexScanVerbHandler.java    |    2 +-
 .../cassandra/service/RangeSliceVerbHandler.java   |    2 +-
 .../cassandra/service/SnapshotVerbHandler.java     |    3 +-
 .../org/apache/cassandra/service/StorageProxy.java |    2 +
 src/java/org/apache/cassandra/tracing/Tracing.java |    6 ++++-
 10 files changed, 28 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 7477993..c0814a1 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -49,7 +49,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
             MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
                                                                           getResponse(command, row),
                                                                           ReadResponse.serializer);
-            logger.debug("Sending response to {}", message.from);
+            logger.debug("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply, id, message.from);
         }
         catch (IOException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index a8cdfdc..842c539 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -54,7 +54,7 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
 
             rm.apply();
             WriteResponse response = new WriteResponse();
-            logger.debug("Sending response to {}", replyTo);
+            logger.debug("Enqueuing response to {}", replyTo);
             MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index ff9448f..ea9ef14 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -49,7 +49,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation>
         logger.debug("Truncate operation succeeded at this host");
 
         TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
-        logger.debug("{} applied.  Sending response to {}@{} ", new Object[]{ t, id, message.from });
+        logger.debug("{} applied.  Enqueuing response to {}@{} ", new Object[]{ t, id, message.from });
         MessagingService.instance().sendReply(response.createMessage(), id, message.from);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 46ac6dd..10de977 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -692,7 +692,7 @@ public final class MessagingService implements MessagingServiceMBean
     public void receive(MessageIn message, String id, long timestamp)
     {
         Tracing.instance().initializeFromMessage(message);
-        logger.debug("Request received from {}", message.from);
+        logger.debug("Messsage received from {}", message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index e52b4cc..1093f70 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -30,7 +31,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 import org.xerial.snappy.SnappyOutputStream;
 
 import org.apache.cassandra.config.Config;
@@ -164,6 +167,13 @@ public class OutboundTcpConnection extends Thread
     {
         try
         {
+            byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER);
+            if (sessionBytes != null)
+            {
+                Tracing.instance().continueExistingSession(UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)));
+                logger.debug("Sending message to {}", poolReference.endPoint());
+            }
+
             write(qm.message, qm.id, qm.timestamp, out, targetVersion);
             completed++;
             if (active.peek() == null)
@@ -227,8 +237,8 @@ public class OutboundTcpConnection extends Thread
             }
             catch (IOException e)
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("exception closing connection to " + poolReference.endPoint(), e);
+                if (logger.isTraceEnabled())
+                    logger.trace("exception closing connection to " + poolReference.endPoint(), e);
             }
             out = null;
             socket = null;
@@ -270,7 +280,7 @@ public class OutboundTcpConnection extends Thread
 
                     if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
                     {
-                        logger.debug("Detected higher max version {} (using {}); will reconnect when queued messages are done",
+                        logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done",
                                      maxTargetVersion, targetVersion);
                         MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion));
                         softCloseSocket();
@@ -281,7 +291,7 @@ public class OutboundTcpConnection extends Thread
                     if (shouldCompressConnection())
                     {
                         out.flush();
-                        logger.debug("Upgrading OutputStream to be compressed");
+                        logger.trace("Upgrading OutputStream to be compressed");
                         out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 774e046..fa4aabb 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -44,7 +44,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand>
                                         command.index_clause.count,
                                         ThriftValidation.asIFilter(command.predicate, cfs.getComparator()));
             RangeSliceReply reply = new RangeSliceReply(rows);
-            logger.debug("Sending response to {}", message.from);
+            logger.debug("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }
         catch (Exception ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index fbbf88a..fd26760 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -55,7 +55,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
             RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
-            logger.debug("Sending response to {}", message.from);
+            logger.debug("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }
         catch (Exception ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index 2d51590..f0f814d 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -38,8 +38,7 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
             Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
         else
             Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
-        if (logger.isDebugEnabled())
-            logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
+        logger.debug("Enqueuing response to snapshot request {} to {} ", command.snapshot_name, message.from);
         MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 57deaad..b08f5b8 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -170,6 +170,7 @@ public class StorageProxy implements StorageProxyMBean
     public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
+        logger.debug("Determining replicas for mutation");
         logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
@@ -247,6 +248,7 @@ public class StorageProxy implements StorageProxyMBean
     public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel consistency_level)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
+        logger.debug("Determining replicas for atomic batch");
         long startTime = System.nanoTime();
         logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6adf52c9/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index c158b10..df49960 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -248,11 +248,15 @@ public class Tracing
         }
 
         assert sessionBytes.length == 16;
-
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = initiatedSessions.get(sessionId);
         if (ts == null)
             ts = new TraceState(message.from, sessionId);
         state.set(ts);
     }
+
+    public void continueExistingSession(UUID sessionId)
+    {
+        state.set(initiatedSessions.get(sessionId));
+    }
 }