You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 15:55:02 UTC
[2/6] git commit: improve tracing patch by jbellis;
reviewed by slebresne for CASSANDRA-5638
improve tracing
patch by jbellis; reviewed by slebresne for CASSANDRA-5638
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec
Branch: refs/heads/trunk
Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f
Parents: f30015c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 14 10:34:57 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:40:57 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/cql3/QueryProcessor.java | 6 +--
.../apache/cassandra/db/ReadVerbHandler.java | 2 -
.../apache/cassandra/service/StorageProxy.java | 50 ++++----------------
3 files changed, 10 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dae9cc9..513c96e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -111,6 +111,7 @@ public class QueryProcessor
private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
+ logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
statement.validate(clientState);
statement.checkAccess(clientState);
@@ -121,7 +122,6 @@ public class QueryProcessor
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
if (prepared.getBoundsTerms() > 0)
throw new InvalidRequestException("Cannot execute query with bind variables");
@@ -187,8 +187,6 @@ public class QueryProcessor
public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
throws RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
-
ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
@@ -245,7 +243,7 @@ public class QueryProcessor
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
- Tracing.trace("Parsing statement");
+ Tracing.trace("Parsing {}", queryStr);
ParsedStatement statement = parseStatement(queryStr);
// Set keyspace for statement that require login
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index a06035a..a05f7a2 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
{
if (command.isDigestQuery())
{
- if (logger.isTraceEnabled())
- logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
return new ReadResponse(ColumnFamily.digest(row.cf));
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5517387..adb3f2d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
@@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
@@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
@@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("Determining replicas for atomic batch");
long startTime = System.nanoTime();
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean
else
{
// belongs on a different server
- if (logger.isTraceEnabled())
- logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
@@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean
messages = HashMultimap.create();
dcMessages.put(dc, messages);
}
-
messages.put(rm.createMessage(), destination);
}
}
@@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean
CompactEndpointSerializationHelper.serialize(destination, dos);
String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
dos.writeUTF(id);
- logger.trace("Adding FWD message to {}@{}", id, destination);
}
message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
// send the combined message + forward headers
- String id = MessagingService.instance().sendRR(message, target, handler);
- logger.trace("Sending message to {}@{}", id, target);
+ Tracing.trace("Enqueuing message to {}", target);
+ MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local " + rm.toString(true));
-
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
@@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean
// Forward the actual update to the chosen leader replica
AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
- if (logger.isTraceEnabled())
- logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+ Tracing.trace("Enqueuing counter update to {}", endpoint);
MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
return responseHandler;
}
@@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
- logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
@@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean
InetAddress dataPoint = endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
- logger.trace("reading data from {}", dataPoint);
+ Tracing.trace("Enqueuing data request to {}", dataPoint);
MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
}
@@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- logger.trace("reading digest from {}", digestPoint);
+ Tracing.trace("Enqueuing digest request to {}", dataPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (message == null)
@@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- logger.trace("LocalReadRunnable reading {}", command);
-
Table table = Table.open(command.table);
Row r = command.getRow(table);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
@@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow() throws ExecutionException, InterruptedException
{
- logger.trace("LocalReadRunnable reading {}", command);
-
RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
handler.response(result);
@@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Determining replicas to query");
- logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
long startTime = System.nanoTime();
Table table = Table.open(command.keyspace);
@@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
&& OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
@@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
+ logger.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
- logger.trace("reading {} from {}", nodeCmd, endpoint);
}
}
@@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean
rows.add(row);
if (nodeCmd.countCQL3Rows)
cql3RowCount += row.getLiveCount(commandPredicate);
- logger.trace("range slices read {}", row.key);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean
public void response(MessageIn<UUID> message)
{
// record the response from the remote node.
- logger.trace("Received schema check response from {}", message.from.getHostAddress());
versions.put(message.from, message.payload);
latch.countDown();
}
@@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean
throw new AssertionError("This latch shouldn't have been interrupted.");
}
- logger.trace("My version is {}", myVersion);
-
// maps versions to hosts that are on that version.
Map<String, List<String>> results = new HashMap<String, List<String>>();
Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
{
- logger.trace("restricted single token match for query {}", queryRange);
return Collections.singletonList(queryRange);
}
@@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean
remainder = splits.right;
}
ranges.add(remainder);
- if (logger.isDebugEnabled())
- logger.trace("restricted ranges for query {} are {}", queryRange, ranges);
return ranges;
}
@@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean
if (hintWindowExpired)
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
- logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+ Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
}
return !hintWindowExpired;
}
@@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean
final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
// Send out the truncate calls and track the responses with the callbacks.
- logger.trace("Starting to send truncate messages to hosts {}", allEndpoints);
+ Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
final Truncation truncation = new Truncation(keyspace, cfname);
MessageOut<Truncation> message = truncation.createMessage();
for (InetAddress endpoint : allEndpoints)
MessagingService.instance().sendRR(message, endpoint, responseHandler);
// Wait for all
- logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor);
responseHandler.get();
}