You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 15:55:03 UTC

[3/6] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0d43f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0d43f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0d43f6

Branch: refs/heads/trunk
Commit: 5b0d43f6f2485f1439eb9d6a7ad112f6c9a515f3
Parents: 093e188 0c81eae
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 08:44:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 08:45:57 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cql3/QueryProcessor.java   |  6 ++--
 .../apache/cassandra/db/ReadVerbHandler.java    |  2 --
 .../apache/cassandra/service/StorageProxy.java  | 36 +++-----------------
 3 files changed, 6 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ac5afbc,513c96e..1b89fe3
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -109,9 -111,10 +109,10 @@@ public class QueryProcesso
      private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
      throws RequestExecutionException, RequestValidationException
      {
+         logger.trace("Process {} @CL.{}", statement, cl);
          ClientState clientState = queryState.getClientState();
 -        statement.validate(clientState);
          statement.checkAccess(clientState);
 +        statement.validate(clientState);
          ResultMessage result = statement.execute(cl, queryState, variables);
          return result == null ? new ResultMessage.Void() : result;
      }
@@@ -119,22 -122,10 +120,21 @@@
      public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
      throws RequestExecutionException, RequestValidationException
      {
 +        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
 +    }
 +
 +    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
 +    throws RequestExecutionException, RequestValidationException
 +    {
-         logger.trace("CQL QUERY: {}", queryString);
          CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
 -        if (prepared.getBoundsTerms() > 0)
 -            throw new InvalidRequestException("Cannot execute query with bind variables");
 -        return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList());
 +        if (prepared.getBoundsTerms() != variables.size())
 +            throw new InvalidRequestException("Invalid amount of bind variables");
 +        return processStatement(prepared, cl, queryState, variables);
 +    }
 +
 +    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
 +    {
 +        return getStatement(queryStr, queryState.getClientState()).statement;
      }
  
      public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 0203e4b,adb3f2d..9d095cc
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -137,10 -136,8 +137,7 @@@ public class StorageProxy implements St
                                AbstractWriteResponseHandler responseHandler,
                                String localDataCenter,
                                ConsistencyLevel consistency_level)
 -            throws IOException
              {
-                 if (logger.isTraceEnabled())
-                     logger.trace("insert writing local & replicate " + mutation.toString(true));
- 
                  Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                  runnable.run();
              }
@@@ -153,10 -150,8 +150,7 @@@
                                AbstractWriteResponseHandler responseHandler,
                                String localDataCenter,
                                ConsistencyLevel consistency_level)
 -            throws IOException
              {
-                 if (logger.isTraceEnabled())
-                     logger.trace("insert writing local & replicate " + mutation.toString(true));
- 
                  Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                  StageManager.getStage(Stage.MUTATION).execute(runnable);
              }
@@@ -779,28 -505,14 +771,27 @@@
                  else
                  {
                      // belongs on a different server
 +                    if (message == null)
 +                        message = rm.createMessage();
- 
                      String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
 -                    Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
 -                    if (messages == null)
 +                    // direct writes to local DC or old Cassandra versions
 +                    // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
 +                    if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
 +                    {
 +                        MessagingService.instance().sendRR(message, destination, responseHandler);
 +                    }
 +                    else
                      {
 -                        messages = HashMultimap.create();
 -                        dcMessages.put(dc, messages);
 +                        Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
 +                        if (messages == null)
 +                        {
 +                            messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
 +                            if (dcGroups == null)
 +                                dcGroups = new HashMap<String, Collection<InetAddress>>();
 +                            dcGroups.put(dc, messages);
 +                        }
 +                        messages.add(destination);
                      }
 -                    messages.put(rm.createMessage(), destination);
                  }
              }
              else
@@@ -908,12 -650,9 +899,9 @@@
  
      private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
      {
-         if (logger.isTraceEnabled())
-             logger.trace("insert writing local " + rm.toString(true));
- 
          Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
          {
 -            public void runMayThrow() throws IOException
 +            public void runMayThrow()
              {
                  rm.apply();
                  responseHandler.response(null);
@@@ -1159,17 -867,62 +1146,16 @@@
              for (int i = 0; i < commands.size(); i++)
              {
                  ReadCommand command = commands.get(i);
 -                Table table = Table.open(command.getKeyspace());
                  assert !command.isDigestQuery();
-                 logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
  
 -                List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
 -                CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
 -
 -                ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
 -                endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
 -                
 -                if (rrDecision != ReadRepairDecision.NONE) {
 -                    ReadRepairMetrics.attempted.mark();
 -                }
 -
 -                RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
 -                ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
 -                handler.assureSufficientLiveNodes();
 -                assert !endpoints.isEmpty();
 -                readCallbacks[i] = handler;
 -
 -                // The data-request message is sent to dataPoint, the node that will actually get the data for us
 -                InetAddress dataPoint = endpoints.get(0);
 -                if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -                {
 -                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
 -                }
 -                else
 -                {
 -                    Tracing.trace("Enqueuing data request to {}", dataPoint);
 -                    MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
 -                }
 -
 -                if (endpoints.size() == 1)
 -                    continue;
 -
 -                // send the other endpoints a digest request
 -                ReadCommand digestCommand = command.copy();
 -                digestCommand.setDigestQuery(true);
 -                MessageOut message = null;
 -                for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
 -                {
 -                    if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
 -                    }
 -                    else
 -                    {
 -                        Tracing.trace("Enqueuing digest request to {}", dataPoint);
 -                        // (We lazy-construct the digest Message object since it may not be necessary if we
 -                        // are doing a local digest read, or no digest reads at all.)
 -                        if (message == null)
 -                            message = digestCommand.createMessage();
 -                        MessagingService.instance().sendRR(message, digestPoint, handler);
 -                    }
 -                }
 +                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
 +                exec.executeAsync();
 +                readExecutors[i] = exec;
              }
  
 +            for (AbstractReadExecutor exec: readExecutors)
 +                exec.speculate();
 +
              // read results and make a second pass for any digest mismatches
              List<ReadCommand> repairCommands = null;
              List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
@@@ -1305,12 -1064,10 +1289,10 @@@
              this.handler = handler;
          }
  
 -        protected void runMayThrow() throws ExecutionException, InterruptedException
 +        protected void runMayThrow()
          {
-             logger.trace("LocalReadRunnable reading {}", command);
- 
              RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
 -            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
 +            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
              handler.response(result);
          }
      }