You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:11:19 UTC

[4/8] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DigestResolver.java
index 572df6f,0000000..62b4538
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@@ -1,98 -1,0 +1,98 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service;
 +
 +import java.nio.ByteBuffer;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.net.MessageIn;
 +
 +public class DigestResolver extends ResponseResolver
 +{
 +    private volatile ReadResponse dataResponse;
 +
 +    public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    {
 +        super(keyspace, command, consistency, maxResponseCount);
 +    }
 +
 +    @Override
 +    public void preprocess(MessageIn<ReadResponse> message)
 +    {
 +        super.preprocess(message);
 +        if (dataResponse == null && !message.payload.isDigestResponse())
 +            dataResponse = message.payload;
 +    }
 +
 +    /**
 +     * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
 +     */
 +    public PartitionIterator getData()
 +    {
 +        assert isDataPresent();
 +        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
 +    }
 +
 +    /*
 +     * This method handles two different scenarios:
 +     *
 +     * a) we're handling the initial read of data from the closest replica + digests
 +     *    from the rest. In this case we check the digests against each other,
 +     *    throw an exception if there is a mismatch, otherwise return the data row.
 +     *
 +     * b) we're checking additional digests that arrived after the minimum to handle
 +     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
 +     */
 +    public PartitionIterator resolve() throws DigestMismatchException
 +    {
 +        if (responses.size() == 1)
 +            return getData();
 +
-         if (logger.isDebugEnabled())
-             logger.debug("resolving {} responses", responses.size());
++        if (logger.isTraceEnabled())
++            logger.trace("resolving {} responses", responses.size());
 +
 +        long start = System.nanoTime();
 +
 +        // validate digests against each other; throw immediately on mismatch.
 +        ByteBuffer digest = null;
 +        for (MessageIn<ReadResponse> message : responses)
 +        {
 +            ReadResponse response = message.payload;
 +
 +            ByteBuffer newDigest = response.digest(command.metadata(), command);
 +            if (digest == null)
 +                digest = newDigest;
 +            else if (!digest.equals(newDigest))
 +                // rely on the fact that only single partition queries use digests
 +                throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest);
 +        }
 +
-         if (logger.isDebugEnabled())
-             logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
++        if (logger.isTraceEnabled())
++            logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return dataResponse != null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index 8b1ef32,145679d..8747004
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -108,46 -102,31 +108,46 @@@ public class ReadCallback implements IA
          }
      }
  
 -    public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
 +    public void awaitResults() throws ReadFailureException, ReadTimeoutException
      {
 -        if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
 +        boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
 +        boolean failed = blockfor + failures > endpoints.size();
 +        if (signaled && !failed)
 +            return;
 +
 +        if (Tracing.isTracing())
          {
 -            // Same as for writes, see AbstractWriteResponseHandler
 -            ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
 -            Tracing.trace("Read timeout: {}", ex.toString());
 -            if (logger.isTraceEnabled())
 -                logger.trace("Read timeout: {}", ex.toString());
 -            throw ex;
 +            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
 +            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
          }
 -
 -        if (blockfor + failures > endpoints.size())
 +        else if (logger.isDebugEnabled())
          {
 -            ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent());
 -
 -            if (logger.isTraceEnabled())
 -                logger.trace("Read failure: {}", ex.toString());
 -            throw ex;
 +            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
 +            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
          }
  
 -        return blockfor == 1 ? resolver.getData() : resolver.resolve();
 +        // Same as for writes, see AbstractWriteResponseHandler
 +        throw failed
 +            ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent())
 +            : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
      }
  
 -    public void response(MessageIn<TMessage> message)
 +    public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
 +    {
 +        awaitResults();
 +
 +        PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
-         if (logger.isDebugEnabled())
-             logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
++        if (logger.isTraceEnabled())
++            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +        return result;
 +    }
 +
 +    public int blockFor()
 +    {
 +        return blockfor;
 +    }
 +
 +    public void response(MessageIn<ReadResponse> message)
      {
          resolver.preprocess(message);
          int n = waitingFor(message.from)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index d209af6,af56c3a..5c94f08
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -819,86 -703,59 +819,86 @@@ public class StorageProxy implements St
          }
      }
  
 -    private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
 +    public static boolean canDoLocalRequest(InetAddress replica)
 +    {
 +        return replica.equals(FBUtilities.getBroadcastAddress());
 +    }
 +
 +    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
 +    throws WriteTimeoutException, WriteFailureException
 +    {
 +        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
 +                                                                     Collections.<InetAddress>emptyList(),
 +                                                                     endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
 +                                                                     Keyspace.open(SystemKeyspace.NAME),
 +                                                                     null,
 +                                                                     WriteType.BATCH_LOG);
 +
 +        Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
 +
 +        if (!endpoints.current.isEmpty())
 +            syncWriteToBatchlog(handler, batch, endpoints.current);
 +
 +        if (!endpoints.legacy.isEmpty())
 +            LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
 +
 +        handler.get();
 +    }
 +
 +    private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
      throws WriteTimeoutException, WriteFailureException
      {
 -        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
 -                                                                        Collections.<InetAddress>emptyList(),
 -                                                                        ConsistencyLevel.ONE,
 -                                                                        Keyspace.open(SystemKeyspace.NAME),
 -                                                                        null,
 -                                                                        WriteType.BATCH_LOG);
 +        MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
  
 -        MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
 -                                                      .createMessage();
          for (InetAddress target : endpoints)
          {
-             logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
 -            int targetVersion = MessagingService.instance().getVersion(target);
 -            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -            {
 -                insertLocal(message.payload, handler);
 -            }
 -            else if (targetVersion == MessagingService.current_version)
 -            {
 -                MessagingService.instance().sendRR(message, target, handler, false);
 -            }
++            logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
 +
 +            if (canDoLocalRequest(target))
 +                performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
              else
 -            {
 -                MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
 -                                                                  .createMessage(),
 -                                                   target,
 -                                                   handler,
 -                                                   false);
 -            }
 +                MessagingService.instance().sendRR(message, target, handler);
          }
 +    }
  
 -        handler.get();
 +    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
 +    {
 +        if (!endpoints.current.isEmpty())
 +            asyncRemoveFromBatchlog(endpoints.current, uuid);
 +
 +        if (!endpoints.legacy.isEmpty())
 +            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
      }
  
      private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
      {
 -        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
 -                                                                        Collections.<InetAddress>emptyList(),
 -                                                                        ConsistencyLevel.ANY,
 -                                                                        Keyspace.open(SystemKeyspace.NAME),
 -                                                                        null,
 -                                                                        WriteType.SIMPLE);
 -        Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid));
 -        mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
 -        MessageOut<Mutation> message = mutation.createMessage();
 +        MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
          for (InetAddress target : endpoints)
          {
-             if (logger.isDebugEnabled())
-                 logger.debug("Sending batchlog remove request {} to {}", uuid, target);
 -            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
 -                insertLocal(message.payload, handler);
++            if (logger.isTraceEnabled())
++                logger.trace("Sending batchlog remove request {} to {}", uuid, target);
 +
 +            if (canDoLocalRequest(target))
 +                performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid));
              else
 -                MessagingService.instance().sendRR(message, target, handler, false);
 +                MessagingService.instance().sendOneWay(message, target);
 +        }
 +    }
 +
 +    private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
 +    {
 +        for (WriteResponseHandlerWrapper wrapper : wrappers)
 +        {
 +            Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
 +
 +            try
 +            {
 +                sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
 +            }
 +            catch (OverloadedException | WriteTimeoutException e)
 +            {
 +                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
 +            }
          }
      }
  
@@@ -1522,144 -1347,218 +1522,144 @@@
       * 4. If the digests (if any) match the data return the data
       * 5. else carry out read repair by getting data from all the nodes.
       */
 -    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
 +    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel)
      throws UnavailableException, ReadFailureException, ReadTimeoutException
      {
 -        List<Row> rows = new ArrayList<>(initialCommands.size());
 -        // (avoid allocating a new list in the common case of nothing-to-retry)
 -        List<ReadCommand> commandsToRetry = Collections.emptyList();
 +        int cmdCount = commands.size();
  
 -        do
 -        {
 -            List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
 -            AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
 +        SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
 +        for (int i = 0; i < cmdCount; i++)
 +            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
  
 -            if (!commandsToRetry.isEmpty())
 -                Tracing.trace("Retrying {} commands", commandsToRetry.size());
 +        for (int i = 0; i < cmdCount; i++)
 +            reads[i].doInitialQueries();
  
 -            // send out read requests
 -            for (int i = 0; i < commands.size(); i++)
 -            {
 -                ReadCommand command = commands.get(i);
 -                assert !command.isDigestQuery();
 +        for (int i = 0; i < cmdCount; i++)
 +            reads[i].maybeTryAdditionalReplicas();
  
 -                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
 -                exec.executeAsync();
 -                readExecutors[i] = exec;
 -            }
 +        for (int i = 0; i < cmdCount; i++)
 +            reads[i].awaitResultsAndRetryOnDigestMismatch();
  
 -            for (AbstractReadExecutor exec : readExecutors)
 -                exec.maybeTryAdditionalReplicas();
 +        for (int i = 0; i < cmdCount; i++)
 +            if (!reads[i].isDone())
 +                reads[i].maybeAwaitFullDataRead();
  
 -            // read results and make a second pass for any digest mismatches
 -            List<ReadCommand> repairCommands = null;
 -            List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
 -            for (AbstractReadExecutor exec: readExecutors)
 -            {
 -                try
 -                {
 -                    Row row = exec.get();
 -                    if (row != null)
 -                    {
 -                        row = exec.command.maybeTrim(row);
 -                        rows.add(row);
 -                    }
 +        List<PartitionIterator> results = new ArrayList<>(cmdCount);
 +        for (int i = 0; i < cmdCount; i++)
 +        {
 +            assert reads[i].isDone();
 +            results.add(reads[i].getResult());
 +        }
  
 -                    if (logger.isTraceEnabled())
 -                        logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
 -                }
 -                catch (ReadTimeoutException|ReadFailureException ex)
 -                {
 -                    int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
 -                    int responseCount = exec.handler.getReceivedCount();
 -                    String gotData = responseCount > 0
 -                                   ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
 -                                   : "";
 -
 -                    boolean isTimeout = ex instanceof ReadTimeoutException;
 -                    if (Tracing.isTracing())
 -                    {
 -                        Tracing.trace("{}; received {} of {} responses{}",
 -                                      isTimeout ? "Timed out" : "Failed", responseCount, blockFor, gotData);
 -                    }
 -                    else if (logger.isDebugEnabled())
 -                    {
 -                        logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData);
 -                    }
 -                    throw ex;
 -                }
 -                catch (DigestMismatchException ex)
 -                {
 -                    Tracing.trace("Digest mismatch: {}", ex);
 +        return PartitionIterators.concat(results);
 +    }
  
 -                    ReadRepairMetrics.repairedBlocking.mark();
 +    private static class SinglePartitionReadLifecycle
 +    {
 +        private final SinglePartitionReadCommand<?> command;
 +        private final AbstractReadExecutor executor;
 +        private final ConsistencyLevel consistency;
  
 -                    // Do a full data read to resolve the correct response (and repair node that need be)
 -                    RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size());
 -                    ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
 -                                                                                       ConsistencyLevel.ALL,
 -                                                                                       exec.getContactedReplicas().size(),
 -                                                                                       exec.command,
 -                                                                                       Keyspace.open(exec.command.getKeyspace()),
 -                                                                                       exec.handler.endpoints);
 +        private PartitionIterator result;
 +        private ReadCallback repairHandler;
  
 -                    if (repairCommands == null)
 -                    {
 -                        repairCommands = new ArrayList<>();
 -                        repairResponseHandlers = new ArrayList<>();
 -                    }
 -                    repairCommands.add(exec.command);
 -                    repairResponseHandlers.add(repairHandler);
 +        SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency)
 +        {
 +            this.command = command;
 +            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
 +            this.consistency = consistency;
 +        }
  
 -                    MessageOut<ReadCommand> message = exec.command.createMessage();
 -                    for (InetAddress endpoint : exec.getContactedReplicas())
 -                    {
 -                        Tracing.trace("Enqueuing full data read to {}", endpoint);
 -                        MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
 -                    }
 -                }
 -            }
 +        boolean isDone()
 +        {
 +            return result != null;
 +        }
  
 -            commandsToRetry.clear();
 +        void doInitialQueries()
 +        {
 +            executor.executeAsync();
 +        }
  
 -            // read the results for the digest mismatch retries
 -            if (repairResponseHandlers != null)
 -            {
 -                for (int i = 0; i < repairCommands.size(); i++)
 -                {
 -                    ReadCommand command = repairCommands.get(i);
 -                    ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
 +        void maybeTryAdditionalReplicas()
 +        {
 +            executor.maybeTryAdditionalReplicas();
 +        }
  
 -                    Row row;
 -                    try
 -                    {
 -                        row = handler.get();
 -                    }
 -                    catch (DigestMismatchException e)
 -                    {
 -                        throw new AssertionError(e); // full data requested from each node here, no digests should be sent
 -                    }
 -                    catch (ReadTimeoutException e)
 -                    {
 -                        if (Tracing.isTracing())
 -                            Tracing.trace("Timed out waiting on digest mismatch repair requests");
 -                        else
 -                            logger.trace("Timed out waiting on digest mismatch repair requests");
 -                        // the caught exception here will have CL.ALL from the repair command,
 -                        // not whatever CL the initial command was at (CASSANDRA-7947)
 -                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
 -                        throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
 -                    }
 +        void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException
 +        {
 +            try
 +            {
 +                result = executor.get();
 +            }
 +            catch (DigestMismatchException ex)
 +            {
 +                Tracing.trace("Digest mismatch: {}", ex);
  
 -                    RowDataResolver resolver = (RowDataResolver)handler.resolver;
 -                    try
 -                    {
 -                        // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
 -                        // behind on writes in case the out-of-sync row is read multiple times in quick succession
 -                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 -                    }
 -                    catch (TimeoutException e)
 -                    {
 -                        if (Tracing.isTracing())
 -                            Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements");
 -                        else
 -                            logger.trace("Timed out waiting on digest mismatch repair acknowledgements");
 -                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
 -                        throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
 -                    }
 +                ReadRepairMetrics.repairedBlocking.mark();
  
 -                    // retry any potential short reads
 -                    ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
 -                    if (retryCommand != null)
 -                    {
 -                        Tracing.trace("Issuing retry for read command");
 -                        if (commandsToRetry == Collections.EMPTY_LIST)
 -                            commandsToRetry = new ArrayList<>();
 -                        commandsToRetry.add(retryCommand);
 -                        continue;
 -                    }
 +                // Do a full data read to resolve the correct response (and repair node that need be)
 +                Keyspace keyspace = Keyspace.open(command.metadata().ksName);
 +                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
 +                repairHandler = new ReadCallback(resolver,
 +                                                 ConsistencyLevel.ALL,
 +                                                 executor.getContactedReplicas().size(),
 +                                                 command,
 +                                                 keyspace,
 +                                                 executor.handler.endpoints);
  
 -                    if (row != null)
 -                    {
 -                        row = command.maybeTrim(row);
 -                        rows.add(row);
 -                    }
 +                for (InetAddress endpoint : executor.getContactedReplicas())
 +                {
 +                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
 +                    Tracing.trace("Enqueuing full data read to {}", endpoint);
 +                    MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
                  }
              }
 -        } while (!commandsToRetry.isEmpty());
 -
 -        return rows;
 -    }
 -
 -    static class LocalReadRunnable extends DroppableRunnable
 -    {
 -        private final ReadCommand command;
 -        private final ReadCallback<ReadResponse, Row> handler;
 -        private final long start = System.nanoTime();
 -
 -        LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler)
 -        {
 -            super(MessagingService.Verb.READ);
 -            this.command = command;
 -            this.handler = handler;
          }
  
 -        protected void runMayThrow()
 +        void maybeAwaitFullDataRead() throws ReadTimeoutException
          {
 +            // There wasn't a digest mismatch, we're good
 +            if (repairHandler == null)
 +                return;
 +
 +            // Otherwise, get the result from the full-data read and check that it's not a short read
              try
              {
 -                Keyspace keyspace = Keyspace.open(command.ksName);
 -                Row r = command.getRow(keyspace);
 -                ReadResponse result = ReadVerbHandler.getResponse(command, r);
 -                MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 -                handler.response(result);
 +                result = repairHandler.get();
              }
 -            catch (Throwable t)
 +            catch (DigestMismatchException e)
              {
 -                handler.onFailure(FBUtilities.getBroadcastAddress());
 -                if (t instanceof TombstoneOverwhelmingException)
 -                    logger.error(t.getMessage());
 +                throw new AssertionError(e); // full data requested from each node here, no digests should be sent
 +            }
 +            catch (ReadTimeoutException e)
 +            {
 +                if (Tracing.isTracing())
 +                    Tracing.trace("Timed out waiting on digest mismatch repair requests");
                  else
-                     logger.debug("Timed out waiting on digest mismatch repair requests");
 -                    throw t;
++                    logger.trace("Timed out waiting on digest mismatch repair requests");
 +                // the caught exception here will have CL.ALL from the repair command,
 +                // not whatever CL the initial command was at (CASSANDRA-7947)
 +                int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName));
 +                throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
              }
          }
 +
 +        PartitionIterator getResult()
 +        {
 +            assert result != null;
 +            return result;
 +        }
      }
  
 -    static class LocalRangeSliceRunnable extends DroppableRunnable
 +    static class LocalReadRunnable extends DroppableRunnable
      {
 -        private final AbstractRangeCommand command;
 -        private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
 +        private final ReadCommand command;
 +        private final ReadCallback handler;
          private final long start = System.nanoTime();
  
 -        LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
 +        LocalReadRunnable(ReadCommand command, ReadCallback handler)
          {
 -            super(MessagingService.Verb.RANGE_SLICE);
 +            super(MessagingService.Verb.READ);
              this.command = command;
              this.handler = handler;
          }
@@@ -1832,199 -1661,253 +1832,199 @@@
          }
      }
  
 -    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
 -    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        Tracing.trace("Computing ranges to query");
 -        long startTime = System.nanoTime();
 +        private final ReadCallback handler;
 +        private PartitionIterator result;
  
 -        Keyspace keyspace = Keyspace.open(command.keyspace);
 -        List<Row> rows;
 -        // now scan until we have enough results
 -        try
 +        private SingleRangeResponse(ReadCallback handler)
          {
 -            int liveRowCount = 0;
 -            boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
 -            rows = new ArrayList<>();
 +            this.handler = handler;
 +        }
  
 -            // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
 -            // expensive in clusters with vnodes)
 -            List<? extends AbstractBounds<RowPosition>> ranges;
 -            if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
 -                ranges = command.keyRange.unwrap();
 -            else
 -                ranges = getRestrictedRanges(command.keyRange);
 +        private void waitForResponse() throws ReadTimeoutException
 +        {
 +            if (result != null)
 +                return;
  
 -            // determine the number of rows to be fetched and the concurrency factor
 -            int rowsToBeFetched = command.limit();
 -            int concurrencyFactor;
 -            if (command.requiresScanningAllRanges())
 +            try
              {
 -                // all nodes must be queried
 -                rowsToBeFetched *= ranges.size();
 -                concurrencyFactor = ranges.size();
 -                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
 -                              ranges.size(), concurrencyFactor);
 +                result = handler.get();
              }
 -            else
 +            catch (DigestMismatchException e)
              {
 -                // our estimate of how many result rows there will be per-range
 -                float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
 -                // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 -                // fetch enough rows in the first round
 -                resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 -                concurrencyFactor = resultRowsPerRange == 0.0
 -                                  ? 1
 -                                  : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
 -
 -                logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             resultRowsPerRange,
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
 -                              ranges.size(),
 -                              concurrencyFactor,
 -                              resultRowsPerRange);
 +                throw new AssertionError(e); // no digests in range slices yet
              }
 +        }
 +
 +        protected RowIterator computeNext()
 +        {
 +            waitForResponse();
 +            return result.hasNext() ? result.next() : endOfData();
 +        }
 +
 +        public void close()
 +        {
 +            if (result != null)
 +                result.close();
 +        }
 +    }
 +
 +    private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
 +    {
 +        private final Iterator<RangeForQuery> ranges;
 +        private final int totalRangeCount;
 +        private final PartitionRangeReadCommand command;
 +        private final Keyspace keyspace;
 +        private final ConsistencyLevel consistency;
 +
 +        private final long startTime;
 +        private CountingPartitionIterator sentQueryIterator;
 +
 +        private int concurrencyFactor;
 +        // The two following "metric" are maintained to improve the concurrencyFactor
 +        // when it was not good enough initially.
 +        private int liveReturned;
 +        private int rangesQueried;
 +
 +        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
 +        {
 +            this.command = command;
 +            this.concurrencyFactor = concurrencyFactor;
 +            this.startTime = System.nanoTime();
 +            this.ranges = new RangeMerger(ranges, keyspace, consistency);
 +            this.totalRangeCount = ranges.rangeCount();
 +            this.consistency = consistency;
 +            this.keyspace = keyspace;
 +        }
  
 -            boolean haveSufficientRows = false;
 -            int i = 0;
 -            AbstractBounds<RowPosition> nextRange = null;
 -            List<InetAddress> nextEndpoints = null;
 -            List<InetAddress> nextFilteredEndpoints = null;
 -            while (i < ranges.size())
 +        public RowIterator computeNext()
 +        {
 +            while (sentQueryIterator == null || !sentQueryIterator.hasNext())
              {
 -                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
 -                int concurrentFetchStartingIndex = i;
 -                int concurrentRequests = 0;
 -                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
 +                // If we don't have more range to handle, we're done
 +                if (!ranges.hasNext())
 +                    return endOfData();
 +
 +                // else, sends the next batch of concurrent queries (after having close the previous iterator)
 +                if (sentQueryIterator != null)
                  {
 -                    AbstractBounds<RowPosition> range = nextRange == null
 -                                                      ? ranges.get(i)
 -                                                      : nextRange;
 -                    List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                    : nextEndpoints;
 -                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                        ? consistency_level.filterForQuery(keyspace, liveEndpoints)
 -                                                        : nextFilteredEndpoints;
 -                    ++i;
 -                    ++concurrentRequests;
 -
 -                    // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
 -                    // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
 -                    // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
 -                    while (i < ranges.size())
 -                    {
 -                        nextRange = ranges.get(i);
 -                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                        // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
 -                        // don't know how to deal with a wrapping range.
 -                        // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
 -                        // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
 -                        // wire compatibility, so It's likely easier not to bother;
 -                        if (range.right.isMinimum())
 -                            break;
 -
 -                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                        // Check if there is enough endpoint for the merge to be possible.
 -                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                            break;
 -
 -                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 -
 -                        // Estimate whether merging will be a win or not
 -                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
 -                            break;
 -
 -                        // If we get there, merge this range and the next one
 -                        range = range.withNewRight(nextRange.right);
 -                        liveEndpoints = merged;
 -                        filteredEndpoints = filteredMerged;
 -                        ++i;
 -                    }
 +                    liveReturned += sentQueryIterator.counter().counted();
 +                    sentQueryIterator.close();
  
 -                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 -
 -                    // collect replies and resolve according to consistency level
 -                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 -                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 -                    handler.assureSufficientLiveNodes();
 -                    resolver.setSources(filteredEndpoints);
 -                    if (filteredEndpoints.size() == 1
 -                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 -                        && OPTIMIZE_LOCAL_REQUESTS)
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
 -                    }
 -                    else
 -                    {
 -                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                        for (InetAddress endpoint : filteredEndpoints)
 -                        {
 -                            Tracing.trace("Enqueuing request to {}", endpoint);
 -                            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 -                        }
 -                    }
 -                    scanHandlers.add(Pair.create(nodeCmd, handler));
 +                    // It's not the first batch of queries and we're not done, so we we can use what has been
 +                    // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
 +                    updateConcurrencyFactor();
                  }
 -                Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
 +                sentQueryIterator = sendNextRequests();
 +            }
  
 -                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 -                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
 -                {
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 -                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
 +            return sentQueryIterator.next();
 +        }
  
 -                    try
 -                    {
 -                        for (Row row : handler.get())
 -                        {
 -                            rows.add(row);
 -                            if (countLiveRows)
 -                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 -                        }
 -                        repairResponses.addAll(resolver.repairResults);
 -                    }
 -                    catch (ReadTimeoutException|ReadFailureException ex)
 -                    {
 -                        // we timed out or failed waiting for responses
 -                        int blockFor = consistency_level.blockFor(keyspace);
 -                        int responseCount = resolver.responses.size();
 -                        String gotData = responseCount > 0
 -                                         ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
 -                                         : "";
 -
 -                        boolean isTimeout = ex instanceof ReadTimeoutException;
 -                        if (Tracing.isTracing())
 -                        {
 -                            Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
 -                                          (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        else if (logger.isDebugEnabled())
 -                        {
 -                            logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
 -                                         (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        throw ex;
 -                    }
 -                    catch (DigestMismatchException e)
 -                    {
 -                        throw new AssertionError(e); // no digests in range slices yet
 -                    }
 +        private void updateConcurrencyFactor()
 +        {
 +            if (liveReturned == 0)
 +            {
 +                // we haven't actually gotten any results, so query all remaining ranges at once
 +                concurrencyFactor = totalRangeCount - rangesQueried;
 +                return;
 +            }
  
 -                    // if we're done, great, otherwise, move to the next range
 -                    int count = countLiveRows ? liveRowCount : rows.size();
 -                    if (count >= rowsToBeFetched)
 -                    {
 -                        haveSufficientRows = true;
 -                        break;
 -                    }
 -                }
 +            // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
 +            // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
 +            int remainingRows = command.limits().count() - liveReturned;
 +            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
 +            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
-             logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
++            logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 +                         rowsPerRange, (int) remainingRows, concurrencyFactor);
 +        }
  
 -                try
 -                {
 -                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
 -                }
 -                catch (TimeoutException ex)
 -                {
 -                    // We got all responses, but timed out while repairing
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    if (Tracing.isTracing())
 -                        Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    else
 -                        logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
 -                }
 +        private SingleRangeResponse query(RangeForQuery toQuery)
 +        {
 +            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
 +
 +            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
 +
 +            int blockFor = consistency.blockFor(keyspace);
 +            int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
 +            List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
 +            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
  
 -                if (haveSufficientRows)
 -                    return command.postReconciliationProcessing(rows);
 +            handler.assureSufficientLiveNodes();
  
 -                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
 -                // based on the results we've seen so far (as long as we still have ranges left to query)
 -                if (i < ranges.size())
 +            if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
 +            {
 +                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get());
 +            }
 +            else
 +            {
 +                for (InetAddress endpoint : toQuery.filteredEndpoints)
                  {
 -                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
 -                    float remainingRows = rowsToBeFetched - fetchedRows;
 -                    float actualRowsPerRange;
 -                    if (fetchedRows == 0.0)
 -                    {
 -                        // we haven't actually gotten any results, so query all remaining ranges at once
 -                        actualRowsPerRange = 0.0f;
 -                        concurrencyFactor = ranges.size() - i;
 -                    }
 -                    else
 -                    {
 -                        actualRowsPerRange = fetchedRows / i;
 -                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
 -                    }
 -                    logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 -                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
 +                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
 +                    Tracing.trace("Enqueuing request to {}", endpoint);
 +                    MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
                  }
              }
 +
 +            return new SingleRangeResponse(handler);
          }
 -        finally
 +
 +        private CountingPartitionIterator sendNextRequests()
 +        {
 +            List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
 +            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
 +            {
 +                concurrentQueries.add(query(ranges.next()));
 +                ++rangesQueried;
 +            }
 +
 +            Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
 +            // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
 +            // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
 +            return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec());
 +        }
 +
 +        public void close()
          {
 -            long latency = System.nanoTime() - startTime;
 -            rangeMetrics.addNano(latency);
 -            Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            try
 +            {
 +                if (sentQueryIterator != null)
 +                    sentQueryIterator.close();
 +            }
 +            finally
 +            {
 +                long latency = System.nanoTime() - startTime;
 +                rangeMetrics.addNano(latency);
 +                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            }
          }
 -        return command.postReconciliationProcessing(rows);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
 +    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    {
 +        Tracing.trace("Computing ranges to query");
 +
 +        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
 +        RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
 +
 +        // our estimate of how many result rows there will be per-range
 +        float resultsPerRange = estimateResultsPerRange(command, keyspace);
 +        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 +        // fetch enough rows in the first round
 +        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 +        int concurrencyFactor = resultsPerRange == 0.0
 +                              ? 1
 +                              : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
-         logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
++        logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 +                     resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
 +        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
 +
 +        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
 +
 +        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
      }
  
      public Map<String, List<String>> getSchemaVersions()
@@@ -2430,52 -2287,6 +2430,52 @@@
              logger.warn("Some hints were not written before shutdown.  This is not supposed to happen.  You should (a) run repair, and (b) file a bug report");
      }
  
 +    private static AtomicInteger getHintsInProgressFor(InetAddress destination)
 +    {
 +        try
 +        {
 +            return hintsInProgress.load(destination);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +    }
 +
 +    public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler)
 +    {
 +        return submitHint(mutation, Collections.singleton(target), responseHandler);
 +    }
 +
 +    public static Future<Void> submitHint(Mutation mutation,
 +                                          Collection<InetAddress> targets,
 +                                          AbstractWriteResponseHandler<IMutation> responseHandler)
 +    {
 +        HintRunnable runnable = new HintRunnable(targets)
 +        {
 +            public void runMayThrow()
 +            {
-                 logger.debug("Adding hints for {}", targets);
++                logger.trace("Adding hints for {}", targets);
 +                HintsService.instance.write(Iterables.transform(targets, StorageService.instance::getHostIdForEndpoint),
 +                                            Hint.create(mutation, System.currentTimeMillis()));
 +                targets.forEach(HintsService.instance.metrics::incrCreatedHints);
 +                // Notify the handler only for CL == ANY
 +                if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
 +                    responseHandler.response(null);
 +            }
 +        };
 +
 +        return submitHint(runnable);
 +    }
 +
 +    private static Future<Void> submitHint(HintRunnable runnable)
 +    {
 +        StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
 +        for (InetAddress target : runnable.targets)
 +            getHintsInProgressFor(target).incrementAndGet();
 +        return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
 +    }
 +
      public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
      public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7017bc1,1abc928..538d128
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@@ -688,10 -538,10 +688,10 @@@ public class CassandraServer implement
              // request by page if this is a large row
              if (cfs.getMeanColumns() > 0)
              {
 -                int averageColumnSize = (int) (cfs.metric.meanRowSize.getValue() / cfs.getMeanColumns());
 +                int averageColumnSize = (int) (cfs.metric.meanPartitionSize.getValue() / cfs.getMeanColumns());
                  pageSize = Math.min(COUNT_PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize);
                  pageSize = Math.max(2, pageSize);
-                 logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+                 logger.trace("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
              }
              else
              {
@@@ -1866,9 -1492,10 +1866,9 @@@
          requestScheduler.release();
      }
  
 -    public String system_add_column_family(CfDef cf_def)
 -    throws InvalidRequestException, SchemaDisagreementException, TException
 +    public String system_add_column_family(CfDef cf_def) throws TException
      {
-         logger.debug("add_column_family");
+         logger.trace("add_column_family");
  
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------