You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/08/29 20:18:20 UTC

[2/3] add request tracing patch by David Alves; reviewed by jbellis for CASSANDRA-1123

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0fea181..9d95fad 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,6 +37,9 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+
+import org.apache.cassandra.tracing.Tracing;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +80,9 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_12 = 5;
     public static final int current_version = VERSION_12;
 
-    /** we preface every message with this number so the recipient can validate the sender is sane */
+    /**
+     * we preface every message with this number so the recipient can validate the sender is sane
+     */
     static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* All verb handler identifiers */
@@ -113,6 +118,7 @@ public final class MessagingService implements MessagingServiceMBean
         SNAPSHOT, // Similar to nt snapshot
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
+        _TRACE, // dummy verb so we can use MS.droppedMessages
         // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -120,6 +126,7 @@ public final class MessagingService implements MessagingServiceMBean
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
+
     public static final Verb[] VERBS = Verb.values();
 
     public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
@@ -239,14 +246,15 @@ public final class MessagingService implements MessagingServiceMBean
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
-    /** One executor per destination InetAddress for streaming.
-     *
+    /**
+     * One executor per destination InetAddress for streaming.
+     * <p/>
      * See CASSANDRA-3494 for the background. We have streaming in place so we do not want to limit ourselves to
      * one stream at a time for throttling reasons. But, we also do not want to just arbitrarily stream an unlimited
      * amount of files at once because a single destination might have hundreds of files pending and it would cause a
      * seek storm. So, transfer exactly one file per destination host. That puts a very natural rate limit on it, in
      * addition to mapping well to the expected behavior in many cases.
-     *
+     * <p/>
      * We will create our stream executors with a core size of 0 so that they time out and do not consume threads. This
      * means the overhead in the degenerate case of having streamed to everyone in the ring over time as a ring changes,
      * is not going to be a thread per node - but rather an instance per node. That's totally fine.
@@ -268,11 +276,11 @@ public final class MessagingService implements MessagingServiceMBean
      * drop internal messages like bootstrap or repair notifications.
      */
     public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb.BINARY,
-                                                                                  Verb.MUTATION,
-                                                                                  Verb.READ_REPAIR,
-                                                                                  Verb.READ,
-                                                                                  Verb.RANGE_SLICE,
-                                                                                  Verb.REQUEST_RESPONSE);
+                                                                   Verb.MUTATION,
+                                                                   Verb.READ_REPAIR,
+                                                                   Verb.READ,
+                                                                   Verb.RANGE_SLICE,
+                                                                   Verb.REQUEST_RESPONSE);
 
     // total dropped message counts for server lifetime
     private final Map<Verb, AtomicInteger> droppedMessages = new EnumMap<Verb, AtomicInteger>(Verb.class);
@@ -364,7 +372,8 @@ public final class MessagingService implements MessagingServiceMBean
 
     /**
      * Track latency information for the dynamic snitch
-     * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
+     *
+     * @param cb      the callback associated with this message -- this lets us know if it's a message type we're interested in
      * @param address the host that replied to the message
      * @param latency
      */
@@ -380,7 +389,9 @@ public final class MessagingService implements MessagingServiceMBean
             subscriber.receiveTiming(address, latency);
     }
 
-    /** called from gossiper when it notices a node is not responding. */
+    /**
+     * called from gossiper when it notices a node is not responding.
+     */
     public void convict(InetAddress ep)
     {
         logger.debug("Resetting pool for " + ep);
@@ -389,12 +400,13 @@ public final class MessagingService implements MessagingServiceMBean
 
     /**
      * Listen on the specified port.
+     *
      * @param localEp InetAddress whose port to listen on.
      */
     public void listen(InetAddress localEp) throws IOException, ConfigurationException
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        for (ServerSocket ss: getServerSocket(localEp))
+        for (ServerSocket ss : getServerSocket(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
             th.start();
@@ -405,7 +417,7 @@ public final class MessagingService implements MessagingServiceMBean
 
     private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
     {
-       final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
+        final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
         if (DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
         {
             ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
@@ -427,7 +439,7 @@ public final class MessagingService implements MessagingServiceMBean
                 throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
             else if (e.getMessage().contains("Cannot assign requested address"))
                 throw new ConfigurationException("Unable to bind to address " + address
-                        + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+                                                 + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
             else
                 throw e;
         }
@@ -467,6 +479,7 @@ public final class MessagingService implements MessagingServiceMBean
     /**
      * Register a verb and the corresponding verb handler with the
      * Messaging Service.
+     *
      * @param verb
      * @param verbHandler handler for the specified verb
      */
@@ -479,6 +492,7 @@ public final class MessagingService implements MessagingServiceMBean
     /**
      * This method returns the verb handler associated with the registered
      * verb. If no handler has been registered then null is returned.
+     *
      * @param type for which the verb handler is sought
      * @return a reference to IVerbHandler which is the handler for the specified verb
      */
@@ -503,6 +517,7 @@ public final class MessagingService implements MessagingServiceMBean
     }
 
     private static final AtomicInteger idGen = new AtomicInteger(0);
+
     // TODO make these integers to avoid unnecessary int -> string -> int conversions
     private static String nextId()
     {
@@ -522,11 +537,12 @@ public final class MessagingService implements MessagingServiceMBean
      * which is invoked with the actual response.
      * Also holds the message (only mutation messages) to determine if it
      * needs to trigger a hint (uses StorageProxy for that).
+     *
      * @param message message to be sent.
-     * @param to endpoint to which the message needs to be sent
-     * @param cb callback interface which is used to pass the responses or
-     *           suggest that a timeout occurred to the invoker of the send().
-     *           suggest that a timeout occurred to the invoker of the send().
+     * @param to      endpoint to which the message needs to be sent
+     * @param cb      callback interface which is used to pass the responses or
+     *                suggest that a timeout occurred to the invoker of the send().
+     *                suggest that a timeout occurred to the invoker of the send().
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
@@ -550,8 +566,9 @@ public final class MessagingService implements MessagingServiceMBean
     /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
      * style messaging.
+     *
      * @param message messages to be sent.
-     * @param to endpoint to which the message needs to be sent
+     * @param to      endpoint to which the message needs to be sent
      */
     public void sendOneWay(MessageOut message, String id, InetAddress to)
     {
@@ -585,9 +602,10 @@ public final class MessagingService implements MessagingServiceMBean
     /**
      * Stream a file from source to destination. This is highly optimized
      * to not hold any of the contents of the file in memory.
+     *
      * @param header Header contains file to stream and other metadata.
-     * @param to endpoint to which we need to stream the file.
-    */
+     * @param to     endpoint to which we need to stream the file.
+     */
 
     public void stream(StreamHeader header, InetAddress to)
     {
@@ -605,8 +623,8 @@ public final class MessagingService implements MessagingServiceMBean
         }
 
         executor.execute(header.file == null || header.file.compressionInfo == null
-                                 ? new FileStreamTask(header, to)
-                                 : new CompressedFileStreamTask(header, to));
+                         ? new FileStreamTask(header, to)
+                         : new CompressedFileStreamTask(header, to));
     }
 
     public void incrementActiveStreamsOutbound()
@@ -619,7 +637,9 @@ public final class MessagingService implements MessagingServiceMBean
         activeStreamsOutbound.decrementAndGet();
     }
 
-    /** The count of active outbound stream tasks. */
+    /**
+     * The count of active outbound stream tasks.
+     */
     public int getActiveStreamsOutbound()
     {
         return activeStreamsOutbound.get();
@@ -675,9 +695,10 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void receive(MessageIn message, String id)
     {
+        Tracing.instance().initializeFromMessage(message);
+
         if (logger.isTraceEnabled())
-            logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb
-                          + " from " + id + "@" + message.from);
+            logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb + " from " + id + "@" + message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
@@ -815,7 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
             {
                 logTpstats = true;
                 logger.info("{} {} messages dropped in last {}ms",
-                             new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
+                            new Object[]{ recent, verb, LOG_DROPPED_INTERVAL_IN_MS });
                 lastDroppedInternal.put(verb, dropped.get());
             }
         }
@@ -875,7 +896,7 @@ public final class MessagingService implements MessagingServiceMBean
         OutboundTcpConnectionPool connection = connectionManagers.get(address);
         return connection == null ? 0 : connection.cmdCon.getPendingMessages();
     }
-    
+
     public Map<String, Long> getCommandCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, Long>();
@@ -945,7 +966,7 @@ public final class MessagingService implements MessagingServiceMBean
     public Map<String, Long> getTimeoutsPerHost()
     {
         Map<String, Long> result = new HashMap<String, Long>();
-        for (Map.Entry<String, AtomicLong> entry: timeoutsPerHost.entrySet())
+        for (Map.Entry<String, AtomicLong> entry : timeoutsPerHost.entrySet())
         {
             result.put(entry.getKey(), entry.getValue().get());
         }
@@ -955,7 +976,7 @@ public final class MessagingService implements MessagingServiceMBean
     public Map<String, Long> getRecentTimeoutsPerHost()
     {
         Map<String, Long> result = new HashMap<String, Long>();
-        for (Map.Entry<String, AtomicLong> entry: recentTimeoutsPerHost.entrySet())
+        for (Map.Entry<String, AtomicLong> entry : recentTimeoutsPerHost.entrySet())
         {
             String ip = entry.getKey();
             AtomicLong recent = entry.getValue();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 22a2066..efe6564 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -194,8 +194,8 @@ public class CassandraDaemon
         // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
         // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
         // until system table is opened.
-        for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_TABLE).values())
-            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);
+        for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_KS).values())
+            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_KS, cfm.cfName);
         try
         {
             SystemTable.checkHealth();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 89b3715..a04f3f1 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -32,8 +32,11 @@ import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.thrift.AuthenticationException;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SemanticVersion;
 
+import static org.apache.cassandra.tracing.Tracing.instance;
+
 /**
  * A container for per-client, thread-local state that Avro/Thrift threads must hold.
  * TODO: Kill thrift exceptions
@@ -47,6 +50,8 @@ public class ClientState
     // Current user for the session
     private AuthenticatedUser user;
     private String keyspace;
+    private UUID preparedTracingSession;
+
     // Reusable array for authorization
     private final List<Object> resource = new ArrayList<Object>();
     private SemanticVersion cqlVersion = DEFAULT_CQL_VERSION;
@@ -103,6 +108,36 @@ public class ClientState
         keyspace = ks;
     }
 
+    public boolean traceNextQuery()
+    {
+        if (preparedTracingSession != null)
+        {
+            return true;
+        }
+
+        double tracingProbability = StorageService.instance.getTracingProbability();
+        return tracingProbability != 0 && FBUtilities.threadLocalRandom().nextDouble() < tracingProbability;
+    }
+
+    public void prepareTracingSession(UUID sessionId)
+    {
+        this.preparedTracingSession = sessionId;
+    }
+
+    public void createSession()
+    {
+        if (this.preparedTracingSession == null)
+        {
+            instance().newSession();
+        }
+        else
+        {
+            UUID session = this.preparedTracingSession;
+            this.preparedTracingSession = null;
+            instance().newSession(session);
+        }
+    }
+
     public String getSchedulingValue()
     {
         switch(DatabaseDescriptor.getRequestSchedulerId())
@@ -141,6 +176,7 @@ public class ClientState
     {
         user = DatabaseDescriptor.getAuthenticator().defaultUser();
         keyspace = null;
+        preparedTracingSession = null;
         resourceClear();
         prepared.clear();
         cql3Prepared.clear();
@@ -174,7 +210,7 @@ public class ClientState
         validateKeyspace(keyspace);
 
         // hardcode disallowing messing with system keyspace
-        if (keyspace.equalsIgnoreCase(Table.SYSTEM_TABLE) && perm == Permission.WRITE)
+        if (keyspace.equalsIgnoreCase(Table.SYSTEM_KS) && perm == Permission.WRITE)
             throw new InvalidRequestException("system keyspace is not user-modifiable");
 
         resourceClear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index dd6391c..a31cbbc 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -297,7 +297,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     public static UUID getLastMigrationId()
     {
         DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
-        Table defs = Table.open(Table.SYSTEM_TABLE);
+        Table defs = Table.open(Table.SYSTEM_KS);
         ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
         ColumnFamily cf = cfStore.getColumnFamily(filter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f308405..5b85a84 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -588,7 +588,7 @@ public class StorageProxy implements StorageProxyMBean
     private static boolean systemTableQuery(List<ReadCommand> cmds)
     {
         for (ReadCommand cmd : cmds)
-            if (!cmd.table.equals(Table.SYSTEM_TABLE))
+            if (!cmd.table.equals(Table.SYSTEM_KS))
                 return false;
         return true;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8abd3e8..7f0714a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.service.AntiEntropyService.RepairFuture;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -152,6 +153,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     private boolean initialized;
     private volatile boolean joined = false;
 
+    /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
+    private double tracingProbability = 0.0;
+
     private static enum Mode { NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
     private Mode operationMode;
 
@@ -429,7 +433,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             @Override
             public void runMayThrow() throws ExecutionException, InterruptedException, IOException
             {
-                ThreadPoolExecutor mutationStage = StageManager.getStage(Stage.MUTATION);
+                ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
                 if (mutationStage.isShutdown())
                     return; // drained already
 
@@ -821,6 +825,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             // Dont set any state for the node which is bootstrapping the existing token...
             tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         }
+        Tracing.instance();
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
         new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update
     }
@@ -1892,7 +1897,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        if (tableName.equals(Table.SYSTEM_TABLE))
+        if (tableName.equals(Table.SYSTEM_KS))
             throw new RuntimeException("Cleanup of the system table is neither necessary nor wise");
 
         NodeId.OneShotRenewer nodeIdRenewer = new NodeId.OneShotRenewer();
@@ -2067,7 +2072,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      */
     public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
-        if (Table.SYSTEM_TABLE.equals(tableName))
+        if (Table.SYSTEM_KS.equals(tableName))
             return;
 
         Collection<Range<Token>> ranges = getLocalRanges(tableName);
@@ -2116,7 +2121,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
-        if (Table.SYSTEM_TABLE.equals(tableName))
+        if (Table.SYSTEM_KS.equals(tableName))
             return;
 
         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
@@ -2134,7 +2139,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
-        if (Table.SYSTEM_TABLE.equals(tableName))
+        if (Table.SYSTEM_KS.equals(tableName))
             return;
 
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
@@ -3309,4 +3314,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     {
         MigrationManager.resetLocalSchema();
     }
+
+    public void setTraceProbability(double probability)
+    {
+        this.tracingProbability = probability;
+    }
+
+    public double getTracingProbability()
+    {
+        return tracingProbability;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0c4bfa3..350eff5 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -434,4 +434,18 @@ public interface StorageServiceMBean
     public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames);
 
     public void resetLocalSchema() throws IOException;
+
+    /**
+     * Enables/Disables tracing for the whole system. Only thrift requests can start tracing currently.
+     * 
+     * @param probability
+     *            ]0,1[ will enable tracing on a partial number of requests with the provided probability. 0 will
+     *            disable tracing and 1 will enable tracing for all requests (which mich severely cripple the system)
+     */
+    public void setTraceProbability(double probability);
+
+    /**
+     * Returns the configured tracing probability.
+     */
+    public double getTracingProbability();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1437cb8..5eedcd7 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -27,7 +27,10 @@ import java.util.concurrent.TimeoutException;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
-import com.google.common.base.Predicates;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +44,16 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.*;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.thrift.TException;
 
 public class CassandraServer implements Cassandra.Iface
@@ -301,21 +308,61 @@ public class CassandraServer implements Cassandra.Iface
     public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("get_slice");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_parent", column_parent.toString(),
+                    "predicate", predicate.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get_slice", traceParameters);
+        }
+        else
+        {
+            logger.debug("get_slice");
+        }
 
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-        return multigetSliceInternal(cState.getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+        try
+        {
+            state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+            return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent,
+                    predicate, consistency_level).get(key);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("multiget_slice");
+        if (startSessionIfRequested())
+        {
+            List<String> keysList = Lists.newArrayList();
+            for (ByteBuffer key : keys)
+            {
+                keysList.add(ByteBufferUtil.bytesToHex(key));
+            }
+            Map<String, String> traceParameters = ImmutableMap.of("keys", keysList.toString(),
+                    "column_parent", column_parent.toString(),
+                    "predicate", predicate.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("multiget_slice", traceParameters);
+        }
+        else
+        {
+            logger.debug("multiget_slice");
+        }
 
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-        return multigetSliceInternal(cState.getKeyspace(), keys, column_parent, predicate, consistency_level);
+        try
+        {
+            state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+            return multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
@@ -380,74 +427,112 @@ public class CassandraServer implements Cassandra.Iface
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
-        logger.debug("get");
 
-        return internal_get(key, column_path, consistency_level);
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_path", column_path.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get", traceParameters);
+        }
+        else
+        {
+            logger.debug("get");
+        }
+
+        try
+        {
+            return internal_get(key, column_path, consistency_level);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("get_count");
-
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-        Table table = Table.open(cState.getKeyspace());
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
-
-        if (predicate.column_names != null)
-            return get_slice(key, column_parent, predicate, consistency_level).size();
-
-        int pageSize;
-        // request by page if this is a large row
-        if (cfs.getMeanColumns() > 0)
+        if (startSessionIfRequested())
         {
-            int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
-            pageSize = Math.min(COUNT_PAGE_SIZE,
-                                DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
-            pageSize = Math.max(2, pageSize);
-            logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_parent", column_parent.toString(),
+                    "predicate", predicate.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get_count", traceParameters);
         }
         else
         {
-            pageSize = COUNT_PAGE_SIZE;
+            logger.debug("get_count");
         }
 
-        int totalCount = 0;
-        List<ColumnOrSuperColumn> columns;
-
-        if (predicate.slice_range == null)
+        try
         {
-            predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                   false,
-                                                   Integer.MAX_VALUE);
-        }
+            ClientState cState = state();
+            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+            Table table = Table.open(cState.getKeyspace());
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
 
-        int requestedCount = predicate.slice_range.count;
-        int pages = 0;
-        while (true)
-        {
-            predicate.slice_range.count = Math.min(pageSize, requestedCount);
-            columns = get_slice(key, column_parent, predicate, consistency_level);
-            if (columns.isEmpty())
-                break;
+            if (predicate.column_names != null)
+                return get_slice(key, column_parent, predicate, consistency_level).size();
 
-            ByteBuffer firstName = getName(columns.get(0));
-            int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
-            totalCount += newColumns;
-            requestedCount -= newColumns;
-            pages++;
-            // We're done if either:
-            //   - We've querying the number of columns requested by the user
-            //   - The last page wasn't full
-            if (requestedCount == 0 || columns.size() < predicate.slice_range.count)
-                break;
+            int pageSize;
+            // request by page if this is a large row
+            if (cfs.getMeanColumns() > 0)
+            {
+                int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
+                pageSize = Math.min(COUNT_PAGE_SIZE,
+                        DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
+                pageSize = Math.max(2, pageSize);
+                logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+            }
             else
-                predicate.slice_range.start = getName(columns.get(columns.size() - 1));
-        }
+            {
+                pageSize = COUNT_PAGE_SIZE;
+            }
+
+            int totalCount = 0;
+            List<ColumnOrSuperColumn> columns;
+
+            if (predicate.slice_range == null)
+            {
+                predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        false,
+                        Integer.MAX_VALUE);
+            }
+
+            int requestedCount = predicate.slice_range.count;
+            int pages = 0;
+            while (true)
+            {
+                predicate.slice_range.count = Math.min(pageSize, requestedCount);
+                columns = get_slice(key, column_parent, predicate, consistency_level);
+                if (columns.isEmpty())
+                    break;
+
+                ByteBuffer firstName = getName(columns.get(0));
+                int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size()
+                        : columns.size() - 1;
+
+                totalCount += newColumns;
+                requestedCount -= newColumns;
+                pages++;
+                // We're done if either:
+                // - We've querying the number of columns requested by the user
+                // - The last page wasn't full
+                if (requestedCount == 0 || columns.size() < predicate.slice_range.count)
+                    break;
+                else
+                    predicate.slice_range.start = getName(columns.get(columns.size() - 1));
+            }
 
-        return totalCount;
+            return totalCount;
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private static ByteBuffer getName(ColumnOrSuperColumn cosc)
@@ -460,19 +545,44 @@ public class CassandraServer implements Cassandra.Iface
     public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("multiget_count");
+        if (startSessionIfRequested())
+        {
+            List<String> keysList = Lists.newArrayList();
+            for (ByteBuffer key : keys)
+            {
+                keysList.add(ByteBufferUtil.bytesToHex(key));
+            }
+            Map<String, String> traceParameters = ImmutableMap.of("keys", keysList.toString(),
+                    "column_parent", column_parent.toString(),
+                    "predicate", predicate.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("multiget_count", traceParameters);
+        }
+        else
+        {
+            logger.debug("multiget_count");
+        }
 
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-        String keyspace = cState.getKeyspace();
+        try
+        {
+            ClientState cState = state();
+            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+            String keyspace = cState.getKeyspace();
 
-        Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
-        Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+            Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
+            Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys,
+                    column_parent, predicate, consistency_level);
 
-        for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet()) {
-          counts.put(cf.getKey(), cf.getValue().size());
+            for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
+            {
+                counts.put(cf.getKey(), cf.getValue().size());
+            }
+            return counts;
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
         }
-        return counts;
     }
 
     private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
@@ -507,9 +617,27 @@ public class CassandraServer implements Cassandra.Iface
     public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("insert");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_parent", column_parent.toString(),
+                    "column", column.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("insert", traceParameters);
+        }
+        else
+        {
+            logger.debug("insert");
+        }
 
-        internal_insert(key, column_parent, column, consistency_level);
+        try
+        {
+            internal_insert(key, column_parent, column, consistency_level);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
@@ -583,9 +711,30 @@ public class CassandraServer implements Cassandra.Iface
     public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("batch_mutate");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = Maps.newLinkedHashMap();
+            for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry : mutation_map.entrySet())
+            {
+                traceParameters.put(ByteBufferUtil.bytesToHex(mutationEntry.getKey()),
+                        Joiner.on(";").withKeyValueSeparator(":").join(mutationEntry.getValue()));
+            }
+            traceParameters.put("consistency_level", consistency_level.name());
+            Tracing.instance().begin("batch_mutate", traceParameters);
+        }
+        else
+        {
+            logger.debug("batch_mutate");
+        }
 
-        internal_batch_mutate(mutation_map, consistency_level);
+        try
+        {
+            internal_batch_mutate(mutation_map, consistency_level);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
@@ -612,9 +761,27 @@ public class CassandraServer implements Cassandra.Iface
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("remove");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_path", column_path.toString(),
+                    "timestamp", timestamp+"",
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("remove", traceParameters);
+        }
+        else
+        {
+            logger.debug("remove");
+        }
 
-        internal_remove(key, column_path, timestamp, consistency_level, false);
+        try
+        {
+            internal_remove(key, column_path, timestamp, consistency_level, false);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException
@@ -648,117 +815,167 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TException, TimedOutException
     {
-        logger.debug("range_slice");
-
-        ClientState cState = state();
-        String keyspace = cState.getKeyspace();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
 
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
-        ThriftValidation.validateColumnParent(metadata, column_parent);
-        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-        ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
-        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of(
+                    "column_parent", column_parent.toString(),
+                    "predicate", predicate.toString(),
+                    "range", range.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get_range_slices", traceParameters);
+        }
+        else
+        {
+            logger.debug("range_slice");
+        }
 
-        List<Row> rows;
         try
         {
-            IPartitioner p = StorageService.getPartitioner();
-            AbstractBounds<RowPosition> bounds;
-            if (range.start_key == null)
-            {
-                Token.TokenFactory tokenFactory = p.getTokenFactory();
-                Token left = tokenFactory.fromString(range.start_token);
-                Token right = tokenFactory.fromString(range.end_token);
-                bounds = Range.makeRowRange(left, right, p);
-            }
-            else
+
+            String keyspace = null;
+            CFMetaData metadata = null;
+
+            ClientState cState = state();
+            keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+
+            metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+            ThriftValidation.validateColumnParent(metadata, column_parent);
+            ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+            ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
+            ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+            List<Row> rows = null;
+            try
             {
-                bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
+                IPartitioner p = StorageService.getPartitioner();
+                AbstractBounds<RowPosition> bounds;
+                if (range.start_key == null)
+                {
+                    Token.TokenFactory tokenFactory = p.getTokenFactory();
+                    Token left = tokenFactory.fromString(range.start_token);
+                    Token right = tokenFactory.fromString(range.end_token);
+                    bounds = Range.makeRowRange(left, right, p);
+                }
+                else
+                {
+                    bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(
+                            range.end_key, p));
+                }
+                schedule(DatabaseDescriptor.getRangeRpcTimeout());
+                try
+                {
+                    IFilter filter = ThriftValidation.asIFilter(predicate,
+                            metadata.getComparatorFor(column_parent.super_column));
+                    rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds,
+                            range.row_filter, range.count), consistency_level);
+                }
+                finally
+                {
+                    release();
+                }
+                assert rows != null;
             }
-            schedule(DatabaseDescriptor.getRangeRpcTimeout());
-            try
+            catch (TimeoutException e)
             {
-                IFilter filter = ThriftValidation.asIFilter(predicate, metadata.getComparatorFor(column_parent.super_column));
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds, range.row_filter, range.count), consistency_level);
+                logger.debug("... timed out");
+                throw new TimedOutException();
             }
-            finally
+            catch (IOException e)
             {
-                release();
+                throw new RuntimeException(e);
             }
-            assert rows != null;
-        }
-        catch (TimeoutException e)
-        {
-            logger.debug("... timed out");
-            throw new TimedOutException();
+
+            return thriftifyKeySlices(rows, column_parent, predicate);
         }
-        catch (IOException e)
+        finally
         {
-            throw new RuntimeException(e);
+            Tracing.instance().stopSession();
         }
-
-        return thriftifyKeySlices(rows, column_parent, predicate);
     }
 
     public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        logger.debug("get_paged_slice");
-
-        ClientState cState = state();
-        String keyspace = cState.getKeyspace();
-        cState.hasColumnFamilyAccess(column_family, Permission.READ);
-
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
-        ThriftValidation.validateKeyRange(metadata, null, range);
-        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
-
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
-
-        IPartitioner p = StorageService.getPartitioner();
-        AbstractBounds<RowPosition> bounds;
-        if (range.start_key == null)
+        if (startSessionIfRequested())
         {
-            // (token, key) is unsupported, assume (token, token)
-            Token.TokenFactory tokenFactory = p.getTokenFactory();
-            Token left = tokenFactory.fromString(range.start_token);
-            Token right = tokenFactory.fromString(range.end_token);
-            bounds = Range.makeRowRange(left, right, p);
+            Map<String, String> traceParameters = ImmutableMap.of(
+                    "column_family", column_family,
+                    "range", range.toString(),
+                    "start_column", ByteBufferUtil.bytesToHex(start_column),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get_paged_slice", traceParameters);
         }
         else
         {
-            RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p)
-                                                    : RowPosition.forKey(range.end_key, p);
-            bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
+            logger.debug("get_paged_slice");
         }
 
-        List<Row> rows;
         try
         {
-            schedule(DatabaseDescriptor.getRangeRpcTimeout());
+
+            ClientState cState = state();
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(column_family, Permission.READ);
+
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
+            ThriftValidation.validateKeyRange(metadata, null, range);
+            ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+            SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column,
+                    ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
+
+            IPartitioner p = StorageService.getPartitioner();
+            AbstractBounds<RowPosition> bounds;
+            if (range.start_key == null)
+            {
+                // (token, key) is unsupported, assume (token, token)
+                Token.TokenFactory tokenFactory = p.getTokenFactory();
+                Token left = tokenFactory.fromString(range.start_token);
+                Token right = tokenFactory.fromString(range.end_token);
+                bounds = Range.makeRowRange(left, right, p);
+            }
+            else
+            {
+                RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token)
+                        .maxKeyBound(p)
+                        : RowPosition.forKey(range.end_key, p);
+                bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
+            }
+
+            List<Row> rows;
             try
             {
-                IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter, bounds, range.row_filter, range.count, true, true), consistency_level);
+                schedule(DatabaseDescriptor.getRangeRpcTimeout());
+                try
+                {
+                    IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
+                    rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter,
+                            bounds, range.row_filter, range.count, true, true), consistency_level);
+                }
+                finally
+                {
+                    release();
+                }
+                assert rows != null;
             }
-            finally
+            catch (TimeoutException e)
             {
-                release();
+                logger.debug("... timed out");
+                throw new TimedOutException();
             }
-            assert rows != null;
-        }
-        catch (TimeoutException e)
-        {
-            logger.debug("... timed out");
-            throw new TimedOutException();
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
         }
-        catch (IOException e)
+        finally
         {
-            throw new RuntimeException(e);
+            Tracing.instance().stopSession();
         }
-
-        return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
     }
 
     private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
@@ -776,46 +993,68 @@ public class CassandraServer implements Cassandra.Iface
 
     public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        logger.debug("scan");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of(
+                    "column_parent", column_parent.toString(),
+                    "index_clause", index_clause.toString(),
+                    "slice_predicate", column_predicate.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("get_indexed_slices", traceParameters);
+        }
+        else
+        {
+            logger.debug("scan");
+        }
 
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-        String keyspace = cState.getKeyspace();
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
-        ThriftValidation.validateColumnParent(metadata, column_parent);
-        ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
-        ThriftValidation.validateIndexClauses(metadata, index_clause);
-        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+        try
+        {
 
-        IPartitioner p = StorageService.getPartitioner();
-        AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
-                                                                     p.getMinimumToken().minKeyBound());
+            ClientState cState = state();
+            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+            String keyspace = cState.getKeyspace();
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
+            ThriftValidation.validateColumnParent(metadata, column_parent);
+            ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
+            ThriftValidation.validateIndexClauses(metadata, index_clause);
+            ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
 
-        IFilter filter = ThriftValidation.asIFilter(column_predicate, metadata.getComparatorFor(column_parent.super_column));
-        RangeSliceCommand command = new RangeSliceCommand(keyspace,
-                                                          column_parent.column_family,
-                                                          null,
-                                                          filter,
-                                                          bounds,
-                                                          index_clause.expressions,
-                                                          index_clause.count);
+            IPartitioner p = StorageService.getPartitioner();
+            AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
+                    p.getMinimumToken().minKeyBound());
+
+            IFilter filter = ThriftValidation.asIFilter(column_predicate,
+                    metadata.getComparatorFor(column_parent.super_column));
+            RangeSliceCommand command = new RangeSliceCommand(keyspace,
+                    column_parent.column_family,
+                    null,
+                    filter,
+                    bounds,
+                    index_clause.expressions,
+                    index_clause.count);
+
+            List<Row> rows;
+            try
+            {
+                rows = StorageProxy.getRangeSlice(command, consistency_level);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (TimeoutException e)
+            {
+                logger.debug("... timed out");
+                throw new TimedOutException();
+            }
+
+            return thriftifyKeySlices(rows, column_parent, column_predicate);
 
-        List<Row> rows;
-        try
-        {
-            rows = StorageProxy.getRangeSlice(command, consistency_level);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
         }
-        catch (TimeoutException e)
+        finally
         {
-            logger.debug("... timed out");
-            throw new TimedOutException();
+            Tracing.instance().stopSession();
         }
-
-        return thriftifyKeySlices(rows, column_parent, column_predicate);
     }
 
     public List<KsDef> describe_keyspaces() throws TException, InvalidRequestException
@@ -1071,8 +1310,17 @@ public class CassandraServer implements Cassandra.Iface
     public void truncate(String cfname) throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
         ClientState cState = state();
-        logger.debug("truncating {} in {}", cfname, cState.getKeyspace());
         cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
+
+        if (startSessionIfRequested())
+        {
+            Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", cState.getKeyspace()));
+        }
+        else
+        {
+            logger.debug("truncating {}.{}", cState.getKeyspace(), cfname);
+        }
+
         try
         {
             schedule(DatabaseDescriptor.getTruncateRpcTimeout());
@@ -1094,6 +1342,10 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw (UnavailableException) new UnavailableException().initCause(e);
         }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     public void set_keyspace(String keyspace) throws InvalidRequestException, TException
@@ -1114,41 +1366,78 @@ public class CassandraServer implements Cassandra.Iface
     public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
             throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        logger.debug("add");
-
-        ClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
-        String keyspace = cState.getKeyspace();
-
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
-        ThriftValidation.validateKey(metadata, key);
-        ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
-        ThriftValidation.validateColumnParent(metadata, column_parent);
-        // SuperColumn field is usually optional, but not when we're adding
-        if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
+        if (startSessionIfRequested())
         {
-            throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
+            Map<String, String> traceParameters = ImmutableMap.of(
+                    "column_parent", column_parent.toString(),
+                    "column", column.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("add", traceParameters);
+        }
+        else
+        {
+            logger.debug("add");
         }
-        ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
 
-        RowMutation rm = new RowMutation(keyspace, key);
         try
         {
-            rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);
+            ClientState cState = state();
+            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
+            String keyspace = cState.getKeyspace();
+
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
+            ThriftValidation.validateKey(metadata, key);
+            ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+            ThriftValidation.validateColumnParent(metadata, column_parent);
+            // SuperColumn field is usually optional, but not when we're adding
+            if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
+            {
+                throw new InvalidRequestException("missing mandatory super column name for super CF "
+                        + column_parent.column_family);
+            }
+            ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
+
+            RowMutation rm = new RowMutation(keyspace, key);
+            try
+            {
+                rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name),
+                        column.value);
+            }
+            catch (MarshalException e)
+            {
+                throw new InvalidRequestException(e.getMessage());
+            }
+            doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
         }
-        catch (MarshalException e)
+        finally
         {
-            throw new InvalidRequestException(e.getMessage());
+            Tracing.instance().stopSession();
         }
-        doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
     }
 
     public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
             throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        logger.debug("remove_counter");
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+                    "column_path", path.toString(),
+                    "consistency_level", consistency_level.name());
+            Tracing.instance().begin("remove_counter", traceParameters);
+        }
+        else
+        {
+            logger.debug("remove_counter");
+        }
 
-        internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
+        try
+        {
+            internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     private static String uncompress(ByteBuffer query, Compression compression) throws InvalidRequestException
@@ -1212,21 +1501,36 @@ public class CassandraServer implements Cassandra.Iface
     public CqlResult execute_cql_query(ByteBuffer query, Compression compression)
     throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
     {
-        if (logger.isDebugEnabled()) logger.debug("execute_cql_query");
-
-        String queryString = uncompress(query,compression);
+        try
+        {
+            String queryString = uncompress(query, compression);
+            if (startSessionIfRequested())
+            {
+                Tracing.instance().begin("execute_cql_query",
+                                         ImmutableMap.of("query", queryString));
+            }
+            else
+            {
+                logger.debug("execute_cql_query");
+            }
 
-        ClientState cState = state();
-        if (cState.getCQLVersion().major == 2)
-            return QueryProcessor.process(queryString, state());
-        else
-            return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+            ClientState cState = state();
+            if (cState.getCQLVersion().major == 2)
+                return QueryProcessor.process(queryString, state());
+            else
+                return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     public CqlPreparedResult prepare_cql_query(ByteBuffer query, Compression compression)
     throws InvalidRequestException, TException
     {
-        if (logger.isDebugEnabled()) logger.debug("prepare_cql_query");
+        if (logger.isDebugEnabled())
+            logger.debug("prepare_cql_query");
 
         String queryString = uncompress(query,compression);
 
@@ -1240,28 +1544,45 @@ public class CassandraServer implements Cassandra.Iface
     public CqlResult execute_prepared_cql_query(int itemId, List<ByteBuffer> bindVariables)
     throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
     {
-        if (logger.isDebugEnabled()) logger.debug("execute_prepared_cql_query");
-
-        ClientState cState = state();
-        if (cState.getCQLVersion().major == 2)
+        if (startSessionIfRequested())
         {
-            CQLStatement statement = cState.getPrepared().get(itemId);
-
-            if (statement == null)
-                throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
-            logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
-
-            return QueryProcessor.processPrepared(statement, cState, bindVariables);
+            // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
+            Tracing.instance().begin("execute_prepared_cql_query", Collections.<String, String>emptyMap());
         }
         else
         {
-            org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+            logger.debug("execute_prepared_cql_query");
+        }
 
-            if (statement == null)
-                throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
-            logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
+        try
+        {
+            ClientState cState = state();
+            if (cState.getCQLVersion().major == 2)
+            {
+                CQLStatement statement = cState.getPrepared().get(itemId);
+
+                if (statement == null)
+                    throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+                logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
+
+                return QueryProcessor.processPrepared(statement, cState, bindVariables);
+            }
+            else
+            {
+                org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+
+                if (statement == null)
+                    throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+                logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
+                        statement.getBoundsTerms());
 
-            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
+                return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables)
+                        .toThriftResult();
+            }
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
         }
     }
 
@@ -1272,5 +1593,22 @@ public class CassandraServer implements Cassandra.Iface
         state().setCQLVersion(version);
     }
 
+    public ByteBuffer trace_next_query() throws TException
+    {
+        UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress());
+        state().prepareTracingSession(sessionId);
+        return TimeUUIDType.instance.decompose(sessionId);
+    }
+
+    private boolean startSessionIfRequested()
+    {
+        if (state().traceNextQuery())
+        {
+            state().createSession();
+            return true;
+        }
+        return false;
+    }
+
     // main method moved to CassandraDaemon
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index cf42f9e..d31515e 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -605,7 +605,7 @@ public class ThriftValidation
 
     public static void validateKeyspaceNotSystem(String modifiedKeyspace) throws InvalidRequestException
     {
-        if (modifiedKeyspace.equalsIgnoreCase(Table.SYSTEM_TABLE))
+        if (modifiedKeyspace.equalsIgnoreCase(Table.SYSTEM_KS))
             throw new InvalidRequestException("system keyspace is not user-modifiable");
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 6bede1b..8cb7dbe 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -36,6 +36,7 @@ import org.apache.commons.cli.*;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
@@ -46,7 +47,7 @@ import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.Pair;
 
-public class NodeCmd
+public class trace_next_queryNodeCmd
 {
     private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = new Pair<String, String>("cf", "column-family");
     private static final Pair<String, String> HOST_OPT = new Pair<String, String>("h", "host");
@@ -120,6 +121,7 @@ public class NodeCmd
         SETCOMPACTIONTHRESHOLD,
         SETCOMPACTIONTHROUGHPUT,
         SETSTREAMTHROUGHPUT,
+        SETTRACEPROBABILITY,
         SNAPSHOT,
         STATUS,
         STATUSTHRIFT,
@@ -145,7 +147,7 @@ public class NodeCmd
         // No args
         addCmdHelp(header, "ring", "Print information about the token ring");
         addCmdHelp(header, "join", "Join the ring");
-        addCmdHelp(header, "info [-T/--tokens]", "Print node information (uptime, load, ...)");
+        addCmdHelp(header, "igit nfo [-T/--tokens]", "Print node information (uptime, load, ...)");
         addCmdHelp(header, "status", "Print cluster information (state, load, IDs, ...)");
         addCmdHelp(header, "cfstats", "Print statistics on column families");
         addCmdHelp(header, "version", "Print cassandra version");
@@ -173,6 +175,7 @@ public class NodeCmd
         addCmdHelp(header, "describering [keyspace]", "Shows the token ranges info of a given keyspace.");
         addCmdHelp(header, "rangekeysample", "Shows the sampled keys held across all keyspaces.");
         addCmdHelp(header, "rebuild [src-dc-name]", "Rebuild data by streaming from other nodes (similarly to bootstrap)");
+        addCmdHelp(header, "settraceprobability [value]", "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default");
 
         // Two args
         addCmdHelp(header, "snapshot [keyspaces...] -cf [columnfamilyName] -t [snapshotName]", "Take a snapshot of the optionally specified column family of the specified keyspaces using optional name snapshotName");
@@ -1033,6 +1036,11 @@ public class NodeCmd
                     probe.setStreamThroughput(Integer.valueOf(arguments[0]));
                     break;
 
+                case SETTRACEPROBABILITY :
+                    if (arguments.length != 1) { badUse("Missing value argument."); }
+                    probe.setTraceProbability(Double.valueOf(arguments[0]));
+                    break;
+
                 case REBUILD :
                     if (arguments.length > 1) { badUse("Too many arguments."); }
                     probe.rebuild(arguments.length == 1 ? arguments[0] : null);
@@ -1284,7 +1292,7 @@ public class NodeCmd
                     catch (ExecutionException ee) { err(ee, "Error occured during compaction"); }
                     break;
                 case CLEANUP :
-                    if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
+                    if (keyspace.equals(Table.SYSTEM_KS)) { break; } // Skip cleanup on system cfs.
                     try { probe.forceTableCleanup(keyspace, columnFamilies); }
                     catch (ExecutionException ee) { err(ee, "Error occured during cleanup"); }
                     break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 494fcda..2d57911 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -701,6 +701,11 @@ public class NodeProbe
         ssProxy.setStreamThroughputMbPerSec(value);
     }
 
+    public void setTraceProbability(double value)
+    {
+        ssProxy.setTraceProbability(value);
+    }
+
     public String getSchemaVersion()
     {
         return ssProxy.getSchemaVersion();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
new file mode 100644
index 0000000..7fca842
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tracing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.cassandra.thrift.ColumnParent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an
+ * operation is being traced.
+ */
+public class TraceState
+{
+    public final UUID sessionId;
+    public final InetAddress coordinator;
+    public final Stopwatch watch;
+    public final ByteBuffer sessionIdBytes;
+
+    public TraceState(InetAddress coordinator, UUID sessionId)
+    {
+        assert coordinator != null;
+        assert sessionId != null;
+
+        this.coordinator = coordinator;
+        this.sessionId = sessionId;
+        sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+        watch = new Stopwatch();
+        watch.start();
+    }
+
+    public int elapsed()
+    {
+        long elapsed = watch.elapsedTime(TimeUnit.MICROSECONDS);
+        return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE;
+    }
+}