You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 15:55:01 UTC
[1/6] git commit: improve tracing patch by jbellis;
reviewed by slebresne for CASSANDRA-5638
Updated Branches:
refs/heads/cassandra-1.2 f30015c86 -> e5c34d7c2
refs/heads/trunk 093e188a4 -> 26018be22
improve tracing
patch by jbellis; reviewed by slebresne for CASSANDRA-5638
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec
Branch: refs/heads/cassandra-1.2
Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f
Parents: f30015c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 14 10:34:57 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:40:57 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/cql3/QueryProcessor.java | 6 +--
.../apache/cassandra/db/ReadVerbHandler.java | 2 -
.../apache/cassandra/service/StorageProxy.java | 50 ++++----------------
3 files changed, 10 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dae9cc9..513c96e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -111,6 +111,7 @@ public class QueryProcessor
private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
+ logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
statement.validate(clientState);
statement.checkAccess(clientState);
@@ -121,7 +122,6 @@ public class QueryProcessor
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
if (prepared.getBoundsTerms() > 0)
throw new InvalidRequestException("Cannot execute query with bind variables");
@@ -187,8 +187,6 @@ public class QueryProcessor
public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
throws RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
-
ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
@@ -245,7 +243,7 @@ public class QueryProcessor
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
- Tracing.trace("Parsing statement");
+ Tracing.trace("Parsing {}", queryStr);
ParsedStatement statement = parseStatement(queryStr);
// Set keyspace for statement that require login
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index a06035a..a05f7a2 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
{
if (command.isDigestQuery())
{
- if (logger.isTraceEnabled())
- logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
return new ReadResponse(ColumnFamily.digest(row.cf));
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5517387..adb3f2d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
@@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
@@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
@@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("Determining replicas for atomic batch");
long startTime = System.nanoTime();
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean
else
{
// belongs on a different server
- if (logger.isTraceEnabled())
- logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
@@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean
messages = HashMultimap.create();
dcMessages.put(dc, messages);
}
-
messages.put(rm.createMessage(), destination);
}
}
@@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean
CompactEndpointSerializationHelper.serialize(destination, dos);
String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
dos.writeUTF(id);
- logger.trace("Adding FWD message to {}@{}", id, destination);
}
message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
// send the combined message + forward headers
- String id = MessagingService.instance().sendRR(message, target, handler);
- logger.trace("Sending message to {}@{}", id, target);
+ Tracing.trace("Enqueuing message to {}", target);
+ MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local " + rm.toString(true));
-
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
@@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean
// Forward the actual update to the chosen leader replica
AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
- if (logger.isTraceEnabled())
- logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+ Tracing.trace("Enqueuing counter update to {}", endpoint);
MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
return responseHandler;
}
@@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
- logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
@@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean
InetAddress dataPoint = endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
- logger.trace("reading data from {}", dataPoint);
+ Tracing.trace("Enqueuing data request to {}", dataPoint);
MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
}
@@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- logger.trace("reading digest from {}", digestPoint);
+ Tracing.trace("Enqueuing digest request to {}", dataPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (message == null)
@@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- logger.trace("LocalReadRunnable reading {}", command);
-
Table table = Table.open(command.table);
Row r = command.getRow(table);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
@@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow() throws ExecutionException, InterruptedException
{
- logger.trace("LocalReadRunnable reading {}", command);
-
RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
handler.response(result);
@@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Determining replicas to query");
- logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
long startTime = System.nanoTime();
Table table = Table.open(command.keyspace);
@@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
&& OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
@@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
+ logger.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
- logger.trace("reading {} from {}", nodeCmd, endpoint);
}
}
@@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean
rows.add(row);
if (nodeCmd.countCQL3Rows)
cql3RowCount += row.getLiveCount(commandPredicate);
- logger.trace("range slices read {}", row.key);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean
public void response(MessageIn<UUID> message)
{
// record the response from the remote node.
- logger.trace("Received schema check response from {}", message.from.getHostAddress());
versions.put(message.from, message.payload);
latch.countDown();
}
@@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean
throw new AssertionError("This latch shouldn't have been interrupted.");
}
- logger.trace("My version is {}", myVersion);
-
// maps versions to hosts that are on that version.
Map<String, List<String>> results = new HashMap<String, List<String>>();
Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
{
- logger.trace("restricted single token match for query {}", queryRange);
return Collections.singletonList(queryRange);
}
@@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean
remainder = splits.right;
}
ranges.add(remainder);
- if (logger.isDebugEnabled())
- logger.trace("restricted ranges for query {} are {}", queryRange, ranges);
return ranges;
}
@@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean
if (hintWindowExpired)
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
- logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+ Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
}
return !hintWindowExpired;
}
@@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean
final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
// Send out the truncate calls and track the responses with the callbacks.
- logger.trace("Starting to send truncate messages to hosts {}", allEndpoints);
+ Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
final Truncation truncation = new Truncation(keyspace, cfname);
MessageOut<Truncation> message = truncation.createMessage();
for (InetAddress endpoint : allEndpoints)
MessagingService.instance().sendRR(message, endpoint, responseHandler);
// Wait for all
- logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor);
responseHandler.get();
}
[2/6] git commit: improve tracing patch by jbellis;
reviewed by slebresne for CASSANDRA-5638
Posted by jb...@apache.org.
improve tracing
patch by jbellis; reviewed by slebresne for CASSANDRA-5638
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec
Branch: refs/heads/trunk
Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f
Parents: f30015c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 14 10:34:57 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:40:57 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/cql3/QueryProcessor.java | 6 +--
.../apache/cassandra/db/ReadVerbHandler.java | 2 -
.../apache/cassandra/service/StorageProxy.java | 50 ++++----------------
3 files changed, 10 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dae9cc9..513c96e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -111,6 +111,7 @@ public class QueryProcessor
private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
+ logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
statement.validate(clientState);
statement.checkAccess(clientState);
@@ -121,7 +122,6 @@ public class QueryProcessor
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
if (prepared.getBoundsTerms() > 0)
throw new InvalidRequestException("Cannot execute query with bind variables");
@@ -187,8 +187,6 @@ public class QueryProcessor
public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
throws RequestValidationException
{
- logger.trace("CQL QUERY: {}", queryString);
-
ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
@@ -245,7 +243,7 @@ public class QueryProcessor
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
- Tracing.trace("Parsing statement");
+ Tracing.trace("Parsing {}", queryStr);
ParsedStatement statement = parseStatement(queryStr);
// Set keyspace for statement that require login
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index a06035a..a05f7a2 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
{
if (command.isDigestQuery())
{
- if (logger.isTraceEnabled())
- logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
return new ReadResponse(ColumnFamily.digest(row.cf));
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5517387..adb3f2d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
@@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
@@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
@@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("Determining replicas for atomic batch");
long startTime = System.nanoTime();
- logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean
else
{
// belongs on a different server
- if (logger.isTraceEnabled())
- logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
@@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean
messages = HashMultimap.create();
dcMessages.put(dc, messages);
}
-
messages.put(rm.createMessage(), destination);
}
}
@@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean
CompactEndpointSerializationHelper.serialize(destination, dos);
String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
dos.writeUTF(id);
- logger.trace("Adding FWD message to {}@{}", id, destination);
}
message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
// send the combined message + forward headers
- String id = MessagingService.instance().sendRR(message, target, handler);
- logger.trace("Sending message to {}@{}", id, target);
+ Tracing.trace("Enqueuing message to {}", target);
+ MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local " + rm.toString(true));
-
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
@@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean
// Forward the actual update to the chosen leader replica
AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
- if (logger.isTraceEnabled())
- logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+ Tracing.trace("Enqueuing counter update to {}", endpoint);
MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
return responseHandler;
}
@@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
- logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
@@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean
InetAddress dataPoint = endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
- logger.trace("reading data from {}", dataPoint);
+ Tracing.trace("Enqueuing data request to {}", dataPoint);
MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
}
@@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- logger.trace("reading digest from {}", digestPoint);
+ Tracing.trace("Enqueuing digest request to {}", dataPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (message == null)
@@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- logger.trace("LocalReadRunnable reading {}", command);
-
Table table = Table.open(command.table);
Row r = command.getRow(table);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
@@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow() throws ExecutionException, InterruptedException
{
- logger.trace("LocalReadRunnable reading {}", command);
-
RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
handler.response(result);
@@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Determining replicas to query");
- logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
long startTime = System.nanoTime();
Table table = Table.open(command.keyspace);
@@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
&& OPTIMIZE_LOCAL_REQUESTS)
{
- logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
@@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
+ logger.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
- logger.trace("reading {} from {}", nodeCmd, endpoint);
}
}
@@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean
rows.add(row);
if (nodeCmd.countCQL3Rows)
cql3RowCount += row.getLiveCount(commandPredicate);
- logger.trace("range slices read {}", row.key);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean
public void response(MessageIn<UUID> message)
{
// record the response from the remote node.
- logger.trace("Received schema check response from {}", message.from.getHostAddress());
versions.put(message.from, message.payload);
latch.countDown();
}
@@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean
throw new AssertionError("This latch shouldn't have been interrupted.");
}
- logger.trace("My version is {}", myVersion);
-
// maps versions to hosts that are on that version.
Map<String, List<String>> results = new HashMap<String, List<String>>();
Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
{
- logger.trace("restricted single token match for query {}", queryRange);
return Collections.singletonList(queryRange);
}
@@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean
remainder = splits.right;
}
ranges.add(remainder);
- if (logger.isDebugEnabled())
- logger.trace("restricted ranges for query {} are {}", queryRange, ranges);
return ranges;
}
@@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean
if (hintWindowExpired)
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
- logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+ Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
}
return !hintWindowExpired;
}
@@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean
final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
// Send out the truncate calls and track the responses with the callbacks.
- logger.trace("Starting to send truncate messages to hosts {}", allEndpoints);
+ Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
final Truncation truncation = new Truncation(keyspace, cfname);
MessageOut<Truncation> message = truncation.createMessage();
for (InetAddress endpoint : allEndpoints)
MessagingService.instance().sendRR(message, endpoint, responseHandler);
// Wait for all
- logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor);
responseHandler.get();
}
[6/6] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26018be2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26018be2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26018be2
Branch: refs/heads/trunk
Commit: 26018be223663989f0de87ff7a902a36be4df678
Parents: 5b0d43f e5c34d7
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:52:55 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:52:55 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/service/StorageProxy.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26018be2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 9d095cc,c12cace..1383be7
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1137,10 -858,10 +1137,10 @@@ public class StorageProxy implements St
do
{
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
- ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
+ AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
if (!commandsToRetry.isEmpty())
- logger.debug("Retrying {} commands", commandsToRetry.size());
+ Tracing.trace("Retrying {} commands", commandsToRetry.size());
// send out read requests
for (int i = 0; i < commands.size(); i++)
@@@ -1174,25 -947,28 +1174,28 @@@
}
catch (DigestMismatchException ex)
{
- logger.trace("Digest mismatch: {}", ex);
-
- Tracing.trace("Digest mismatch: {}", ex.toString());
-
++ Tracing.trace("Digest mismatch: {}", ex);
++
ReadRepairMetrics.repairedBlocking.mark();
-
+
// Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
- ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
+ RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+ ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
}
- repairCommands.add(command);
+ repairCommands.add(exec.command);
repairResponseHandlers.add(repairHandler);
- MessageOut<ReadCommand> message = command.createMessage();
- for (InetAddress endpoint : handler.endpoints)
+ MessageOut<ReadCommand> message = exec.command.createMessage();
+ for (InetAddress endpoint : exec.handler.endpoints)
+ {
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, repairHandler);
+ }
}
}
[4/6] git commit: cleanup logger.debug too
Posted by jb...@apache.org.
cleanup logger.debug too
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5c34d7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5c34d7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5c34d7c
Branch: refs/heads/trunk
Commit: e5c34d7c29e9ec6dc162210b90fc69ea11a4c331
Parents: 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:51:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:51:39 2013 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/service/StorageProxy.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5c34d7c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index adb3f2d..c12cace 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -861,7 +861,7 @@ public class StorageProxy implements StorageProxyMBean
ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
if (!commandsToRetry.isEmpty())
- logger.debug("Retrying {} commands", commandsToRetry.size());
+ Tracing.trace("Retrying {} commands", commandsToRetry.size());
// send out read requests
for (int i = 0; i < commands.size(); i++)
@@ -947,7 +947,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (DigestMismatchException ex)
{
- logger.debug("Digest mismatch: {}", ex.toString());
+ Tracing.trace("Digest mismatch: {}", ex.toString());
ReadRepairMetrics.repairedBlocking.mark();
@@ -963,9 +963,10 @@ public class StorageProxy implements StorageProxyMBean
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);
+ MessageOut<ReadCommand> message = command.createMessage();
for (InetAddress endpoint : handler.endpoints)
{
- MessageOut<ReadCommand> message = command.createMessage();
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, repairHandler);
}
}
@@ -1009,7 +1010,7 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
if (retryCommand != null)
{
- logger.debug("Issuing retry for read command");
+ Tracing.trace("Issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
commandsToRetry = new ArrayList<ReadCommand>();
commandsToRetry.add(retryCommand);
@@ -1193,7 +1194,7 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
- logger.trace("Enqueuing request to {}", endpoint);
+ Tracing.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
}
}
[5/6] git commit: cleanup logger.debug too
Posted by jb...@apache.org.
cleanup logger.debug too
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5c34d7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5c34d7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5c34d7c
Branch: refs/heads/cassandra-1.2
Commit: e5c34d7c29e9ec6dc162210b90fc69ea11a4c331
Parents: 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:51:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:51:39 2013 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/service/StorageProxy.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5c34d7c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index adb3f2d..c12cace 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -861,7 +861,7 @@ public class StorageProxy implements StorageProxyMBean
ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
if (!commandsToRetry.isEmpty())
- logger.debug("Retrying {} commands", commandsToRetry.size());
+ Tracing.trace("Retrying {} commands", commandsToRetry.size());
// send out read requests
for (int i = 0; i < commands.size(); i++)
@@ -947,7 +947,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (DigestMismatchException ex)
{
- logger.debug("Digest mismatch: {}", ex.toString());
+ Tracing.trace("Digest mismatch: {}", ex.toString());
ReadRepairMetrics.repairedBlocking.mark();
@@ -963,9 +963,10 @@ public class StorageProxy implements StorageProxyMBean
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);
+ MessageOut<ReadCommand> message = command.createMessage();
for (InetAddress endpoint : handler.endpoints)
{
- MessageOut<ReadCommand> message = command.createMessage();
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, repairHandler);
}
}
@@ -1009,7 +1010,7 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
if (retryCommand != null)
{
- logger.debug("Issuing retry for read command");
+ Tracing.trace("Issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
commandsToRetry = new ArrayList<ReadCommand>();
commandsToRetry.add(retryCommand);
@@ -1193,7 +1194,7 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
- logger.trace("Enqueuing request to {}", endpoint);
+ Tracing.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
}
}
[3/6] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0d43f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0d43f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0d43f6
Branch: refs/heads/trunk
Commit: 5b0d43f6f2485f1439eb9d6a7ad112f6c9a515f3
Parents: 093e188 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:44:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:45:57 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/cql3/QueryProcessor.java | 6 ++--
.../apache/cassandra/db/ReadVerbHandler.java | 2 --
.../apache/cassandra/service/StorageProxy.java | 36 +++-----------------
3 files changed, 6 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ac5afbc,513c96e..1b89fe3
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -109,9 -111,10 +109,10 @@@ public class QueryProcesso
private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
+ logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
- statement.validate(clientState);
statement.checkAccess(clientState);
+ statement.validate(clientState);
ResultMessage result = statement.execute(cl, queryState, variables);
return result == null ? new ResultMessage.Void() : result;
}
@@@ -119,22 -122,10 +120,21 @@@
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
+ return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
+ }
+
+ public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
+ throws RequestExecutionException, RequestValidationException
+ {
- logger.trace("CQL QUERY: {}", queryString);
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
- if (prepared.getBoundsTerms() > 0)
- throw new InvalidRequestException("Cannot execute query with bind variables");
- return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList());
+ if (prepared.getBoundsTerms() != variables.size())
+ throw new InvalidRequestException("Invalid amount of bind variables");
+ return processStatement(prepared, cl, queryState, variables);
+ }
+
+ public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ {
+ return getStatement(queryStr, queryState.getClientState()).statement;
}
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 0203e4b,adb3f2d..9d095cc
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -137,10 -136,8 +137,7 @@@ public class StorageProxy implements St
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
@@@ -153,10 -150,8 +150,7 @@@
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
@@@ -779,28 -505,14 +771,27 @@@
else
{
// belongs on a different server
+ if (message == null)
+ message = rm.createMessage();
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
- if (messages == null)
+ // direct writes to local DC or old Cassandra versions
+ // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+ if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
+ {
+ MessagingService.instance().sendRR(message, destination, responseHandler);
+ }
+ else
{
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
+ Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (messages == null)
+ {
+ messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, messages);
+ }
+ messages.add(destination);
}
- messages.put(rm.createMessage(), destination);
}
}
else
@@@ -908,12 -650,9 +899,9 @@@
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local " + rm.toString(true));
-
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
- public void runMayThrow() throws IOException
+ public void runMayThrow()
{
rm.apply();
responseHandler.response(null);
@@@ -1159,17 -867,62 +1146,16 @@@
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
- Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
- logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
- List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
- CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
-
- ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
- endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
-
- if (rrDecision != ReadRepairDecision.NONE) {
- ReadRepairMetrics.attempted.mark();
- }
-
- RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
- ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
- handler.assureSufficientLiveNodes();
- assert !endpoints.isEmpty();
- readCallbacks[i] = handler;
-
- // The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- Tracing.trace("Enqueuing data request to {}", dataPoint);
- MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
- }
-
- if (endpoints.size() == 1)
- continue;
-
- // send the other endpoints a digest request
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut message = null;
- for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
- {
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- Tracing.trace("Enqueuing digest request to {}", dataPoint);
- // (We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.)
- if (message == null)
- message = digestCommand.createMessage();
- MessagingService.instance().sendRR(message, digestPoint, handler);
- }
- }
+ AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+ exec.executeAsync();
+ readExecutors[i] = exec;
}
+ for (AbstractReadExecutor exec: readExecutors)
+ exec.speculate();
+
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
@@@ -1305,12 -1064,10 +1289,10 @@@
this.handler = handler;
}
- protected void runMayThrow() throws ExecutionException, InterruptedException
+ protected void runMayThrow()
{
- logger.trace("LocalReadRunnable reading {}", command);
-
RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
- MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
handler.response(result);
}
}