You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 15:55:01 UTC

[1/6] git commit: improve tracing patch by jbellis; reviewed by slebresne for CASSANDRA-5638

Updated Branches:
  refs/heads/cassandra-1.2 f30015c86 -> e5c34d7c2
  refs/heads/trunk 093e188a4 -> 26018be22


improve tracing
patch by jbellis; reviewed by slebresne for CASSANDRA-5638


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec

Branch: refs/heads/cassandra-1.2
Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f
Parents: f30015c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 14 10:34:57 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:40:57 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cql3/QueryProcessor.java   |  6 +--
 .../apache/cassandra/db/ReadVerbHandler.java    |  2 -
 .../apache/cassandra/service/StorageProxy.java  | 50 ++++----------------
 3 files changed, 10 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dae9cc9..513c96e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -111,6 +111,7 @@ public class QueryProcessor
     private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
+        logger.trace("Process {} @CL.{}", statement, cl);
         ClientState clientState = queryState.getClientState();
         statement.validate(clientState);
         statement.checkAccess(clientState);
@@ -121,7 +122,6 @@ public class QueryProcessor
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        logger.trace("CQL QUERY: {}", queryString);
         CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
         if (prepared.getBoundsTerms() > 0)
             throw new InvalidRequestException("Cannot execute query with bind variables");
@@ -187,8 +187,6 @@ public class QueryProcessor
     public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
     throws RequestValidationException
     {
-        logger.trace("CQL QUERY: {}", queryString);
-
         ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
         ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
 
@@ -245,7 +243,7 @@ public class QueryProcessor
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
     throws RequestValidationException
     {
-        Tracing.trace("Parsing statement");
+        Tracing.trace("Parsing {}", queryStr);
         ParsedStatement statement = parseStatement(queryStr);
 
         // Set keyspace for statement that require login

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index a06035a..a05f7a2 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
     {
         if (command.isDigestQuery())
         {
-            if (logger.isTraceEnabled())
-                logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
             return new ReadResponse(ColumnFamily.digest(row.cf));
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5517387..adb3f2d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean
                               ConsistencyLevel consistency_level)
             throws IOException
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("insert writing local & replicate " + mutation.toString(true));
-
                 Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 runnable.run();
             }
@@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean
                               ConsistencyLevel consistency_level)
             throws IOException
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("insert writing local & replicate " + mutation.toString(true));
-
                 Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 StageManager.getStage(Stage.MUTATION).execute(runnable);
             }
@@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
-        logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
@@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean
     {
         Tracing.trace("Determining replicas for atomic batch");
         long startTime = System.nanoTime();
-        logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
 
         List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
         String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     // belongs on a different server
-                    if (logger.isTraceEnabled())
-                        logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
                     Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
@@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean
                         messages = HashMultimap.create();
                         dcMessages.put(dc, messages);
                     }
-
                     messages.put(rm.createMessage(), destination);
                 }
             }
@@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean
             CompactEndpointSerializationHelper.serialize(destination, dos);
             String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
             dos.writeUTF(id);
-            logger.trace("Adding FWD message to {}@{}", id, destination);
         }
         message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
         // send the combined message + forward headers
-        String id = MessagingService.instance().sendRR(message, target, handler);
-        logger.trace("Sending message to {}@{}", id, target);
+        Tracing.trace("Enqueuing message to {}", target);
+        MessagingService.instance().sendRR(message, target, handler);
     }
 
     private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("insert writing local " + rm.toString(true));
-
         Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
@@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean
             // Forward the actual update to the chosen leader replica
             AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
 
-            if (logger.isTraceEnabled())
-                logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+            Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
             return responseHandler;
         }
@@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean
                 ReadCommand command = commands.get(i);
                 Table table = Table.open(command.getKeyspace());
                 assert !command.isDigestQuery();
-                logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
                 List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
                 CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
@@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean
                 InetAddress dataPoint = endpoints.get(0);
                 if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
-                    logger.trace("reading data locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
                 }
                 else
                 {
-                    logger.trace("reading data from {}", dataPoint);
+                    Tracing.trace("Enqueuing data request to {}", dataPoint);
                     MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
                 }
 
@@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                     {
-                        logger.trace("reading digest locally");
                         StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                     }
                     else
                     {
-                        logger.trace("reading digest from {}", digestPoint);
+                        Tracing.trace("Enqueuing digest request to {}", dataPoint);
                         // (We lazy-construct the digest Message object since it may not be necessary if we
                         // are doing a local digest read, or no digest reads at all.)
                         if (message == null)
@@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            logger.trace("LocalReadRunnable reading {}", command);
-
             Table table = Table.open(command.table);
             Row r = command.getRow(table);
             ReadResponse result = ReadVerbHandler.getResponse(command, r);
@@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow() throws ExecutionException, InterruptedException
         {
-            logger.trace("LocalReadRunnable reading {}", command);
-
             RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
             MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
             handler.response(result);
@@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, ReadTimeoutException
     {
         Tracing.trace("Determining replicas to query");
-        logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
         long startTime = System.nanoTime();
 
         Table table = Table.open(command.keyspace);
@@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean
                     && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
                     && OPTIMIZE_LOCAL_REQUESTS)
                 {
-                    logger.trace("reading data locally");
                     StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
                 }
                 else
@@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
+                        logger.trace("Enqueuing request to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, handler);
-                        logger.trace("reading {} from {}", nodeCmd, endpoint);
                     }
                 }
 
@@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean
                         rows.add(row);
                         if (nodeCmd.countCQL3Rows)
                             cql3RowCount += row.getLiveCount(commandPredicate);
-                        logger.trace("range slices read {}", row.key);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }
@@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean
             public void response(MessageIn<UUID> message)
             {
                 // record the response from the remote node.
-                logger.trace("Received schema check response from {}", message.from.getHostAddress());
                 versions.put(message.from, message.payload);
                 latch.countDown();
             }
@@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean
             throw new AssertionError("This latch shouldn't have been interrupted.");
         }
 
-        logger.trace("My version is {}", myVersion);
-
         // maps versions to hosts that are on that version.
         Map<String, List<String>> results = new HashMap<String, List<String>>();
         Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean
         // special case for bounds containing exactly 1 (non-minimum) token
         if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
         {
-            logger.trace("restricted single token match for query {}", queryRange);
             return Collections.singletonList(queryRange);
         }
 
@@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean
             remainder = splits.right;
         }
         ranges.add(remainder);
-        if (logger.isDebugEnabled())
-            logger.trace("restricted ranges for query {} are {}", queryRange, ranges);
 
         return ranges;
     }
@@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean
         if (hintWindowExpired)
         {
             HintedHandOffManager.instance.metrics.incrPastWindow(ep);
-            logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+            Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
         }
         return !hintWindowExpired;
     }
@@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean
         final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
 
         // Send out the truncate calls and track the responses with the callbacks.
-        logger.trace("Starting to send truncate messages to hosts {}", allEndpoints);
+        Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
         final Truncation truncation = new Truncation(keyspace, cfname);
         MessageOut<Truncation> message = truncation.createMessage();
         for (InetAddress endpoint : allEndpoints)
             MessagingService.instance().sendRR(message, endpoint, responseHandler);
 
         // Wait for all
-        logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor);
         responseHandler.get();
     }
 


[2/6] git commit: improve tracing patch by jbellis; reviewed by slebresne for CASSANDRA-5638

Posted by jb...@apache.org.
improve tracing
patch by jbellis; reviewed by slebresne for CASSANDRA-5638


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec

Branch: refs/heads/trunk
Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f
Parents: f30015c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 14 10:34:57 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:40:57 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cql3/QueryProcessor.java   |  6 +--
 .../apache/cassandra/db/ReadVerbHandler.java    |  2 -
 .../apache/cassandra/service/StorageProxy.java  | 50 ++++----------------
 3 files changed, 10 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dae9cc9..513c96e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -111,6 +111,7 @@ public class QueryProcessor
     private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
+        logger.trace("Process {} @CL.{}", statement, cl);
         ClientState clientState = queryState.getClientState();
         statement.validate(clientState);
         statement.checkAccess(clientState);
@@ -121,7 +122,6 @@ public class QueryProcessor
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        logger.trace("CQL QUERY: {}", queryString);
         CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
         if (prepared.getBoundsTerms() > 0)
             throw new InvalidRequestException("Cannot execute query with bind variables");
@@ -187,8 +187,6 @@ public class QueryProcessor
     public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
     throws RequestValidationException
     {
-        logger.trace("CQL QUERY: {}", queryString);
-
         ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
         ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
 
@@ -245,7 +243,7 @@ public class QueryProcessor
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
     throws RequestValidationException
     {
-        Tracing.trace("Parsing statement");
+        Tracing.trace("Parsing {}", queryStr);
         ParsedStatement statement = parseStatement(queryStr);
 
         // Set keyspace for statement that require login

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index a06035a..a05f7a2 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
     {
         if (command.isDigestQuery())
         {
-            if (logger.isTraceEnabled())
-                logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
             return new ReadResponse(ColumnFamily.digest(row.cf));
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5517387..adb3f2d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean
                               ConsistencyLevel consistency_level)
             throws IOException
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("insert writing local & replicate " + mutation.toString(true));
-
                 Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 runnable.run();
             }
@@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean
                               ConsistencyLevel consistency_level)
             throws IOException
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("insert writing local & replicate " + mutation.toString(true));
-
                 Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 StageManager.getStage(Stage.MUTATION).execute(runnable);
             }
@@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
-        logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
@@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean
     {
         Tracing.trace("Determining replicas for atomic batch");
         long startTime = System.nanoTime();
-        logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
 
         List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
         String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     // belongs on a different server
-                    if (logger.isTraceEnabled())
-                        logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
                     Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
@@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean
                         messages = HashMultimap.create();
                         dcMessages.put(dc, messages);
                     }
-
                     messages.put(rm.createMessage(), destination);
                 }
             }
@@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean
             CompactEndpointSerializationHelper.serialize(destination, dos);
             String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
             dos.writeUTF(id);
-            logger.trace("Adding FWD message to {}@{}", id, destination);
         }
         message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
         // send the combined message + forward headers
-        String id = MessagingService.instance().sendRR(message, target, handler);
-        logger.trace("Sending message to {}@{}", id, target);
+        Tracing.trace("Enqueuing message to {}", target);
+        MessagingService.instance().sendRR(message, target, handler);
     }
 
     private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("insert writing local " + rm.toString(true));
-
         Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
@@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean
             // Forward the actual update to the chosen leader replica
             AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
 
-            if (logger.isTraceEnabled())
-                logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+            Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
             return responseHandler;
         }
@@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean
                 ReadCommand command = commands.get(i);
                 Table table = Table.open(command.getKeyspace());
                 assert !command.isDigestQuery();
-                logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
                 List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
                 CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
@@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean
                 InetAddress dataPoint = endpoints.get(0);
                 if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
-                    logger.trace("reading data locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
                 }
                 else
                 {
-                    logger.trace("reading data from {}", dataPoint);
+                    Tracing.trace("Enqueuing data request to {}", dataPoint);
                     MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
                 }
 
@@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                     {
-                        logger.trace("reading digest locally");
                         StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                     }
                     else
                     {
-                        logger.trace("reading digest from {}", digestPoint);
+                        Tracing.trace("Enqueuing digest request to {}", dataPoint);
                         // (We lazy-construct the digest Message object since it may not be necessary if we
                         // are doing a local digest read, or no digest reads at all.)
                         if (message == null)
@@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            logger.trace("LocalReadRunnable reading {}", command);
-
             Table table = Table.open(command.table);
             Row r = command.getRow(table);
             ReadResponse result = ReadVerbHandler.getResponse(command, r);
@@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow() throws ExecutionException, InterruptedException
         {
-            logger.trace("LocalReadRunnable reading {}", command);
-
             RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
             MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
             handler.response(result);
@@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, ReadTimeoutException
     {
         Tracing.trace("Determining replicas to query");
-        logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
         long startTime = System.nanoTime();
 
         Table table = Table.open(command.keyspace);
@@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean
                     && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
                     && OPTIMIZE_LOCAL_REQUESTS)
                 {
-                    logger.trace("reading data locally");
                     StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
                 }
                 else
@@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
+                        logger.trace("Enqueuing request to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, handler);
-                        logger.trace("reading {} from {}", nodeCmd, endpoint);
                     }
                 }
 
@@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean
                         rows.add(row);
                         if (nodeCmd.countCQL3Rows)
                             cql3RowCount += row.getLiveCount(commandPredicate);
-                        logger.trace("range slices read {}", row.key);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }
@@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean
             public void response(MessageIn<UUID> message)
             {
                 // record the response from the remote node.
-                logger.trace("Received schema check response from {}", message.from.getHostAddress());
                 versions.put(message.from, message.payload);
                 latch.countDown();
             }
@@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean
             throw new AssertionError("This latch shouldn't have been interrupted.");
         }
 
-        logger.trace("My version is {}", myVersion);
-
         // maps versions to hosts that are on that version.
         Map<String, List<String>> results = new HashMap<String, List<String>>();
         Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean
         // special case for bounds containing exactly 1 (non-minimum) token
         if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
         {
-            logger.trace("restricted single token match for query {}", queryRange);
             return Collections.singletonList(queryRange);
         }
 
@@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean
             remainder = splits.right;
         }
         ranges.add(remainder);
-        if (logger.isDebugEnabled())
-            logger.trace("restricted ranges for query {} are {}", queryRange, ranges);
 
         return ranges;
     }
@@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean
         if (hintWindowExpired)
         {
             HintedHandOffManager.instance.metrics.incrPastWindow(ep);
-            logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+            Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
         }
         return !hintWindowExpired;
     }
@@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean
         final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
 
         // Send out the truncate calls and track the responses with the callbacks.
-        logger.trace("Starting to send truncate messages to hosts {}", allEndpoints);
+        Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
         final Truncation truncation = new Truncation(keyspace, cfname);
         MessageOut<Truncation> message = truncation.createMessage();
         for (InetAddress endpoint : allEndpoints)
             MessagingService.instance().sendRR(message, endpoint, responseHandler);
 
         // Wait for all
-        logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor);
         responseHandler.get();
     }
 


[6/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26018be2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26018be2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26018be2

Branch: refs/heads/trunk
Commit: 26018be223663989f0de87ff7a902a36be4df678
Parents: 5b0d43f e5c34d7
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:52:55 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:52:55 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/service/StorageProxy.java     | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/26018be2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 9d095cc,c12cace..1383be7
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1137,10 -858,10 +1137,10 @@@ public class StorageProxy implements St
          do
          {
              List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
 -            ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
 +            AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
  
              if (!commandsToRetry.isEmpty())
-                 logger.debug("Retrying {} commands", commandsToRetry.size());
+                 Tracing.trace("Retrying {} commands", commandsToRetry.size());
  
              // send out read requests
              for (int i = 0; i < commands.size(); i++)
@@@ -1174,25 -947,28 +1174,28 @@@
                  }
                  catch (DigestMismatchException ex)
                  {
-                     logger.trace("Digest mismatch: {}", ex);
-                     
 -                    Tracing.trace("Digest mismatch: {}", ex.toString());
 -                    
++                    Tracing.trace("Digest mismatch: {}", ex);
++
                      ReadRepairMetrics.repairedBlocking.mark();
 -                    
 +
                      // Do a full data read to resolve the correct response (and repair node that need be)
 -                    RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
 -                    ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
 +                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
 +                    ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
  
                      if (repairCommands == null)
                      {
                          repairCommands = new ArrayList<ReadCommand>();
                          repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
                      }
 -                    repairCommands.add(command);
 +                    repairCommands.add(exec.command);
                      repairResponseHandlers.add(repairHandler);
  
 -                    MessageOut<ReadCommand> message = command.createMessage();
 -                    for (InetAddress endpoint : handler.endpoints)
 +                    MessageOut<ReadCommand> message = exec.command.createMessage();
 +                    for (InetAddress endpoint : exec.handler.endpoints)
+                     {
+                         Tracing.trace("Enqueuing full data read to {}", endpoint);
                          MessagingService.instance().sendRR(message, endpoint, repairHandler);
+                     }
                  }
              }
  


[4/6] git commit: cleanup logger.debug too

Posted by jb...@apache.org.
cleanup logger.debug too


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5c34d7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5c34d7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5c34d7c

Branch: refs/heads/trunk
Commit: e5c34d7c29e9ec6dc162210b90fc69ea11a4c331
Parents: 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:51:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:51:39 2013 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/service/StorageProxy.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5c34d7c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index adb3f2d..c12cace 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -861,7 +861,7 @@ public class StorageProxy implements StorageProxyMBean
             ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
 
             if (!commandsToRetry.isEmpty())
-                logger.debug("Retrying {} commands", commandsToRetry.size());
+                Tracing.trace("Retrying {} commands", commandsToRetry.size());
 
             // send out read requests
             for (int i = 0; i < commands.size(); i++)
@@ -947,7 +947,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 catch (DigestMismatchException ex)
                 {
-                    logger.debug("Digest mismatch: {}", ex.toString());
+                    Tracing.trace("Digest mismatch: {}", ex.toString());
                     
                     ReadRepairMetrics.repairedBlocking.mark();
                     
@@ -963,9 +963,10 @@ public class StorageProxy implements StorageProxyMBean
                     repairCommands.add(command);
                     repairResponseHandlers.add(repairHandler);
 
+                    MessageOut<ReadCommand> message = command.createMessage();
                     for (InetAddress endpoint : handler.endpoints)
                     {
-                        MessageOut<ReadCommand> message = command.createMessage();
+                        Tracing.trace("Enqueuing full data read to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, repairHandler);
                     }
                 }
@@ -1009,7 +1010,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
                     if (retryCommand != null)
                     {
-                        logger.debug("Issuing retry for read command");
+                        Tracing.trace("Issuing retry for read command");
                         if (commandsToRetry == Collections.EMPTY_LIST)
                             commandsToRetry = new ArrayList<ReadCommand>();
                         commandsToRetry.add(retryCommand);
@@ -1193,7 +1194,7 @@ public class StorageProxy implements StorageProxyMBean
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
-                        logger.trace("Enqueuing request to {}", endpoint);
+                        Tracing.trace("Enqueuing request to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, handler);
                     }
                 }


[5/6] git commit: cleanup logger.debug too

Posted by jb...@apache.org.
cleanup logger.debug too


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5c34d7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5c34d7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5c34d7c

Branch: refs/heads/cassandra-1.2
Commit: e5c34d7c29e9ec6dc162210b90fc69ea11a4c331
Parents: 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:51:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:51:39 2013 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/service/StorageProxy.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5c34d7c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index adb3f2d..c12cace 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -861,7 +861,7 @@ public class StorageProxy implements StorageProxyMBean
             ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
 
             if (!commandsToRetry.isEmpty())
-                logger.debug("Retrying {} commands", commandsToRetry.size());
+                Tracing.trace("Retrying {} commands", commandsToRetry.size());
 
             // send out read requests
             for (int i = 0; i < commands.size(); i++)
@@ -947,7 +947,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 catch (DigestMismatchException ex)
                 {
-                    logger.debug("Digest mismatch: {}", ex.toString());
+                    Tracing.trace("Digest mismatch: {}", ex.toString());
                     
                     ReadRepairMetrics.repairedBlocking.mark();
                     
@@ -963,9 +963,10 @@ public class StorageProxy implements StorageProxyMBean
                     repairCommands.add(command);
                     repairResponseHandlers.add(repairHandler);
 
+                    MessageOut<ReadCommand> message = command.createMessage();
                     for (InetAddress endpoint : handler.endpoints)
                     {
-                        MessageOut<ReadCommand> message = command.createMessage();
+                        Tracing.trace("Enqueuing full data read to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, repairHandler);
                     }
                 }
@@ -1009,7 +1010,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
                     if (retryCommand != null)
                     {
-                        logger.debug("Issuing retry for read command");
+                        Tracing.trace("Issuing retry for read command");
                         if (commandsToRetry == Collections.EMPTY_LIST)
                             commandsToRetry = new ArrayList<ReadCommand>();
                         commandsToRetry.add(retryCommand);
@@ -1193,7 +1194,7 @@ public class StorageProxy implements StorageProxyMBean
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
-                        logger.trace("Enqueuing request to {}", endpoint);
+                        Tracing.trace("Enqueuing request to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, handler);
                     }
                 }


[3/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0d43f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0d43f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0d43f6

Branch: refs/heads/trunk
Commit: 5b0d43f6f2485f1439eb9d6a7ad112f6c9a515f3
Parents: 093e188 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:44:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:45:57 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cql3/QueryProcessor.java   |  6 ++--
 .../apache/cassandra/db/ReadVerbHandler.java    |  2 --
 .../apache/cassandra/service/StorageProxy.java  | 36 +++-----------------
 3 files changed, 6 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ac5afbc,513c96e..1b89fe3
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -109,9 -111,10 +109,10 @@@ public class QueryProcesso
      private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
      throws RequestExecutionException, RequestValidationException
      {
+         logger.trace("Process {} @CL.{}", statement, cl);
          ClientState clientState = queryState.getClientState();
 -        statement.validate(clientState);
          statement.checkAccess(clientState);
 +        statement.validate(clientState);
          ResultMessage result = statement.execute(cl, queryState, variables);
          return result == null ? new ResultMessage.Void() : result;
      }
@@@ -119,22 -122,10 +120,21 @@@
      public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
      throws RequestExecutionException, RequestValidationException
      {
 +        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
 +    }
 +
 +    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
 +    throws RequestExecutionException, RequestValidationException
 +    {
-         logger.trace("CQL QUERY: {}", queryString);
          CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
 -        if (prepared.getBoundsTerms() > 0)
 -            throw new InvalidRequestException("Cannot execute query with bind variables");
 -        return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList());
 +        if (prepared.getBoundsTerms() != variables.size())
 +            throw new InvalidRequestException("Invalid amount of bind variables");
 +        return processStatement(prepared, cl, queryState, variables);
 +    }
 +
 +    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
 +    {
 +        return getStatement(queryStr, queryState.getClientState()).statement;
      }
  
      public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 0203e4b,adb3f2d..9d095cc
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -137,10 -136,8 +137,7 @@@ public class StorageProxy implements St
                                AbstractWriteResponseHandler responseHandler,
                                String localDataCenter,
                                ConsistencyLevel consistency_level)
 -            throws IOException
              {
-                 if (logger.isTraceEnabled())
-                     logger.trace("insert writing local & replicate " + mutation.toString(true));
- 
                  Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                  runnable.run();
              }
@@@ -153,10 -150,8 +150,7 @@@
                                AbstractWriteResponseHandler responseHandler,
                                String localDataCenter,
                                ConsistencyLevel consistency_level)
 -            throws IOException
              {
-                 if (logger.isTraceEnabled())
-                     logger.trace("insert writing local & replicate " + mutation.toString(true));
- 
                  Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                  StageManager.getStage(Stage.MUTATION).execute(runnable);
              }
@@@ -779,28 -505,14 +771,27 @@@
                  else
                  {
                      // belongs on a different server
 +                    if (message == null)
 +                        message = rm.createMessage();
- 
                      String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
 -                    Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
 -                    if (messages == null)
 +                    // direct writes to local DC or old Cassandra versions
 +                    // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
 +                    if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
 +                    {
 +                        MessagingService.instance().sendRR(message, destination, responseHandler);
 +                    }
 +                    else
                      {
 -                        messages = HashMultimap.create();
 -                        dcMessages.put(dc, messages);
 +                        Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
 +                        if (messages == null)
 +                        {
 +                            messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
 +                            if (dcGroups == null)
 +                                dcGroups = new HashMap<String, Collection<InetAddress>>();
 +                            dcGroups.put(dc, messages);
 +                        }
 +                        messages.add(destination);
                      }
 -                    messages.put(rm.createMessage(), destination);
                  }
              }
              else
@@@ -908,12 -650,9 +899,9 @@@
  
      private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
      {
-         if (logger.isTraceEnabled())
-             logger.trace("insert writing local " + rm.toString(true));
- 
          Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
          {
 -            public void runMayThrow() throws IOException
 +            public void runMayThrow()
              {
                  rm.apply();
                  responseHandler.response(null);
@@@ -1159,17 -867,62 +1146,16 @@@
              for (int i = 0; i < commands.size(); i++)
              {
                  ReadCommand command = commands.get(i);
 -                Table table = Table.open(command.getKeyspace());
                  assert !command.isDigestQuery();
-                 logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
  
 -                List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
 -                CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
 -
 -                ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
 -                endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
 -                
 -                if (rrDecision != ReadRepairDecision.NONE) {
 -                    ReadRepairMetrics.attempted.mark();
 -                }
 -
 -                RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
 -                ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
 -                handler.assureSufficientLiveNodes();
 -                assert !endpoints.isEmpty();
 -                readCallbacks[i] = handler;
 -
 -                // The data-request message is sent to dataPoint, the node that will actually get the data for us
 -                InetAddress dataPoint = endpoints.get(0);
 -                if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -                {
 -                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
 -                }
 -                else
 -                {
 -                    Tracing.trace("Enqueuing data request to {}", dataPoint);
 -                    MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
 -                }
 -
 -                if (endpoints.size() == 1)
 -                    continue;
 -
 -                // send the other endpoints a digest request
 -                ReadCommand digestCommand = command.copy();
 -                digestCommand.setDigestQuery(true);
 -                MessageOut message = null;
 -                for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
 -                {
 -                    if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
 -                    }
 -                    else
 -                    {
 -                        Tracing.trace("Enqueuing digest request to {}", dataPoint);
 -                        // (We lazy-construct the digest Message object since it may not be necessary if we
 -                        // are doing a local digest read, or no digest reads at all.)
 -                        if (message == null)
 -                            message = digestCommand.createMessage();
 -                        MessagingService.instance().sendRR(message, digestPoint, handler);
 -                    }
 -                }
 +                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
 +                exec.executeAsync();
 +                readExecutors[i] = exec;
              }
  
 +            for (AbstractReadExecutor exec: readExecutors)
 +                exec.speculate();
 +
              // read results and make a second pass for any digest mismatches
              List<ReadCommand> repairCommands = null;
              List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
@@@ -1305,12 -1064,10 +1289,10 @@@
              this.handler = handler;
          }
  
 -        protected void runMayThrow() throws ExecutionException, InterruptedException
 +        protected void runMayThrow()
          {
-             logger.trace("LocalReadRunnable reading {}", command);
- 
              RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
 -            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
 +            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
              handler.response(result);
          }
      }