You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 15:55:03 UTC
[3/6] git commit: merge from 1.2
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0d43f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0d43f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0d43f6
Branch: refs/heads/trunk
Commit: 5b0d43f6f2485f1439eb9d6a7ad112f6c9a515f3
Parents: 093e188 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:44:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:45:57 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/cql3/QueryProcessor.java | 6 ++--
.../apache/cassandra/db/ReadVerbHandler.java | 2 --
.../apache/cassandra/service/StorageProxy.java | 36 +++-----------------
3 files changed, 6 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ac5afbc,513c96e..1b89fe3
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -109,9 -111,10 +109,10 @@@ public class QueryProcesso
private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
throws RequestExecutionException, RequestValidationException
{
+ logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
- statement.validate(clientState);
statement.checkAccess(clientState);
+ statement.validate(clientState);
ResultMessage result = statement.execute(cl, queryState, variables);
return result == null ? new ResultMessage.Void() : result;
}
@@@ -119,22 -122,10 +120,21 @@@
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
+ return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
+ }
+
+ public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
+ throws RequestExecutionException, RequestValidationException
+ {
- logger.trace("CQL QUERY: {}", queryString);
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
- if (prepared.getBoundsTerms() > 0)
- throw new InvalidRequestException("Cannot execute query with bind variables");
- return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList());
+ if (prepared.getBoundsTerms() != variables.size())
+ throw new InvalidRequestException("Invalid amount of bind variables");
+ return processStatement(prepared, cl, queryState, variables);
+ }
+
+ public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ {
+ return getStatement(queryStr, queryState.getClientState()).statement;
}
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 0203e4b,adb3f2d..9d095cc
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -137,10 -136,8 +137,7 @@@ public class StorageProxy implements St
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
@@@ -153,10 -150,8 +150,7 @@@
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local & replicate " + mutation.toString(true));
-
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
@@@ -779,28 -505,14 +771,27 @@@
else
{
// belongs on a different server
+ if (message == null)
+ message = rm.createMessage();
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
- if (messages == null)
+ // direct writes to local DC or old Cassandra versions
+ // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+ if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
+ {
+ MessagingService.instance().sendRR(message, destination, responseHandler);
+ }
+ else
{
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
+ Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (messages == null)
+ {
+ messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, messages);
+ }
+ messages.add(destination);
}
- messages.put(rm.createMessage(), destination);
}
}
else
@@@ -908,12 -650,9 +899,9 @@@
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
{
- if (logger.isTraceEnabled())
- logger.trace("insert writing local " + rm.toString(true));
-
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
- public void runMayThrow() throws IOException
+ public void runMayThrow()
{
rm.apply();
responseHandler.response(null);
@@@ -1159,17 -867,62 +1146,16 @@@
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
- Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
- logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
- List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
- CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
-
- ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
- endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
-
- if (rrDecision != ReadRepairDecision.NONE) {
- ReadRepairMetrics.attempted.mark();
- }
-
- RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
- ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
- handler.assureSufficientLiveNodes();
- assert !endpoints.isEmpty();
- readCallbacks[i] = handler;
-
- // The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- Tracing.trace("Enqueuing data request to {}", dataPoint);
- MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
- }
-
- if (endpoints.size() == 1)
- continue;
-
- // send the other endpoints a digest request
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut message = null;
- for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
- {
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- Tracing.trace("Enqueuing digest request to {}", dataPoint);
- // (We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.)
- if (message == null)
- message = digestCommand.createMessage();
- MessagingService.instance().sendRR(message, digestPoint, handler);
- }
- }
+ AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+ exec.executeAsync();
+ readExecutors[i] = exec;
}
+ for (AbstractReadExecutor exec: readExecutors)
+ exec.speculate();
+
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
@@@ -1305,12 -1064,10 +1289,10 @@@
this.handler = handler;
}
- protected void runMayThrow() throws ExecutionException, InterruptedException
+ protected void runMayThrow()
{
- logger.trace("LocalReadRunnable reading {}", command);
-
RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
- MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
handler.response(result);
}
}