You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/12 13:47:16 UTC

[2/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c85749e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c85749e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c85749e2

Branch: refs/heads/trunk
Commit: c85749e2bc5c65a03b00994565cd0b6b1a642e65
Parents: 2a828af e86d531
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Aug 12 15:34:01 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Aug 12 15:37:13 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  | 49 +++++++++++++-------
 2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3ce9f9e,ddc6720..959b967
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,39 -1,5 +1,40 @@@
 -2.2.8
 +3.0.9
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Update StorageProxy range metrics for timeouts, failures and unavailables (CASSANDRA-9507)
   * Add Sigar to classes included in clientutil.jar (CASSANDRA-11635)
   * Add decay to histograms and timers used for metrics (CASSANDRA-11752)
   * Fix hanging stream session (CASSANDRA-10992)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 483da67,7b7979d..8a151f2
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1955,201 -1716,267 +1955,218 @@@ public class StorageProxy implements St
          }
      }
  
 -    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
 -    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        Tracing.trace("Computing ranges to query");
 -        long startTime = System.nanoTime();
 +        private final ReadCallback handler;
 +        private PartitionIterator result;
  
 -        Keyspace keyspace = Keyspace.open(command.keyspace);
 -        List<Row> rows;
 -        // now scan until we have enough results
 -        try
 +        private SingleRangeResponse(ReadCallback handler)
          {
 -            int liveRowCount = 0;
 -            boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
 -            rows = new ArrayList<>();
 +            this.handler = handler;
 +        }
  
 -            // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
 -            // expensive in clusters with vnodes)
 -            List<? extends AbstractBounds<RowPosition>> ranges;
 -            if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
 -                ranges = command.keyRange.unwrap();
 -            else
 -                ranges = getRestrictedRanges(command.keyRange);
 -
 -            // determine the number of rows to be fetched and the concurrency factor
 -            int rowsToBeFetched = command.limit();
 -            int concurrencyFactor;
 -            if (command.requiresScanningAllRanges())
 -            {
 -                // all nodes must be queried
 -                rowsToBeFetched *= ranges.size();
 -                concurrencyFactor = ranges.size();
 -                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
 -                              ranges.size(), concurrencyFactor);
 +        private void waitForResponse() throws ReadTimeoutException
 +        {
 +            if (result != null)
 +                return;
 +
 +            try
 +            {
 +                result = handler.get();
              }
 -            else
 +            catch (DigestMismatchException e)
              {
 -                // our estimate of how many result rows there will be per-range
 -                float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
 -                // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 -                // fetch enough rows in the first round
 -                resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 -                concurrencyFactor = resultRowsPerRange == 0.0
 -                                  ? 1
 -                                  : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
 -
 -                logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             resultRowsPerRange,
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
 -                              ranges.size(),
 -                              concurrencyFactor,
 -                              resultRowsPerRange);
 -            }
 -
 -            boolean haveSufficientRows = false;
 -            int i = 0;
 -            AbstractBounds<RowPosition> nextRange = null;
 -            List<InetAddress> nextEndpoints = null;
 -            List<InetAddress> nextFilteredEndpoints = null;
 -            while (i < ranges.size())
 -            {
 -                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
 -                int concurrentFetchStartingIndex = i;
 -                int concurrentRequests = 0;
 -                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
 -                {
 -                    AbstractBounds<RowPosition> range = nextRange == null
 -                                                      ? ranges.get(i)
 -                                                      : nextRange;
 -                    List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                    : nextEndpoints;
 -                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                        ? consistency_level.filterForQuery(keyspace, liveEndpoints)
 -                                                        : nextFilteredEndpoints;
 -                    ++i;
 -                    ++concurrentRequests;
 -
 -                    // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
 -                    // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
 -                    // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
 -                    while (i < ranges.size())
 -                    {
 -                        nextRange = ranges.get(i);
 -                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                        // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
 -                        // don't know how to deal with a wrapping range.
 -                        // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
 -                        // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
 -                        // wire compatibility, so It's likely easier not to bother;
 -                        if (range.right.isMinimum())
 -                            break;
 -
 -                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                        // Check if there is enough endpoint for the merge to be possible.
 -                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                            break;
 -
 -                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 -
 -                        // Estimate whether merging will be a win or not
 -                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
 -                            break;
 -
 -                        // If we get there, merge this range and the next one
 -                        range = range.withNewRight(nextRange.right);
 -                        liveEndpoints = merged;
 -                        filteredEndpoints = filteredMerged;
 -                        ++i;
 -                    }
 -
 -                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +                throw new AssertionError(e); // no digests in range slices yet
 +            }
 +        }
  
 -                    // collect replies and resolve according to consistency level
 -                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 -                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 -                    handler.assureSufficientLiveNodes();
 -                    resolver.setSources(filteredEndpoints);
 -                    if (filteredEndpoints.size() == 1
 -                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
 -                    }
 -                    else
 -                    {
 -                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                        for (InetAddress endpoint : filteredEndpoints)
 -                        {
 -                            Tracing.trace("Enqueuing request to {}", endpoint);
 -                            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 -                        }
 -                    }
 -                    scanHandlers.add(Pair.create(nodeCmd, handler));
 -                }
 -                Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
 +        protected RowIterator computeNext()
 +        {
 +            waitForResponse();
 +            return result.hasNext() ? result.next() : endOfData();
 +        }
  
 -                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 -                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
 -                {
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 -                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
 +        public void close()
 +        {
 +            if (result != null)
 +                result.close();
 +        }
 +    }
  
 -                    try
 -                    {
 -                        for (Row row : handler.get())
 -                        {
 -                            rows.add(row);
 -                            if (countLiveRows)
 -                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 -                        }
 -                        repairResponses.addAll(resolver.repairResults);
 -                    }
 -                    catch (ReadTimeoutException|ReadFailureException ex)
 -                    {
 -                        // we timed out or failed waiting for responses
 -                        int blockFor = consistency_level.blockFor(keyspace);
 -                        int responseCount = resolver.responses.size();
 -                        String gotData = responseCount > 0
 -                                         ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
 -                                         : "";
 -
 -                        boolean isTimeout = ex instanceof ReadTimeoutException;
 -                        if (Tracing.isTracing())
 -                        {
 -                            Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
 -                                          (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        else if (logger.isDebugEnabled())
 -                        {
 -                            logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
 -                                         (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        throw ex;
 -                    }
 -                    catch (DigestMismatchException e)
 -                    {
 -                        throw new AssertionError(e); // no digests in range slices yet
 -                    }
 +    private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
 +    {
 +        private final Iterator<RangeForQuery> ranges;
 +        private final int totalRangeCount;
 +        private final PartitionRangeReadCommand command;
 +        private final Keyspace keyspace;
 +        private final ConsistencyLevel consistency;
  
 -                    // if we're done, great, otherwise, move to the next range
 -                    int count = countLiveRows ? liveRowCount : rows.size();
 -                    if (count >= rowsToBeFetched)
 -                    {
 -                        haveSufficientRows = true;
 -                        break;
 -                    }
 -                }
 +        private final long startTime;
 +        private DataLimits.Counter counter;
 +        private PartitionIterator sentQueryIterator;
  
 -                try
 -                {
 -                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
 -                }
 -                catch (TimeoutException ex)
 -                {
 -                    // We got all responses, but timed out while repairing
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    if (Tracing.isTracing())
 -                        Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    else
 -                        logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
 -                }
 +        private int concurrencyFactor;
 +        // The two following "metric" are maintained to improve the concurrencyFactor
 +        // when it was not good enough initially.
 +        private int liveReturned;
 +        private int rangesQueried;
  
 -                if (haveSufficientRows)
 -                    return command.postReconciliationProcessing(rows);
 +        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
 +        {
 +            this.command = command;
 +            this.concurrencyFactor = concurrencyFactor;
 +            this.startTime = System.nanoTime();
 +            this.ranges = new RangeMerger(ranges, keyspace, consistency);
 +            this.totalRangeCount = ranges.rangeCount();
 +            this.consistency = consistency;
 +            this.keyspace = keyspace;
 +        }
  
 -                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
 -                // based on the results we've seen so far (as long as we still have ranges left to query)
 -                if (i < ranges.size())
 +        public RowIterator computeNext()
 +        {
-             while (sentQueryIterator == null || !sentQueryIterator.hasNext())
++            try
 +            {
-                 // If we don't have more range to handle, we're done
-                 if (!ranges.hasNext())
-                     return endOfData();
- 
-                 // else, sends the next batch of concurrent queries (after having close the previous iterator)
-                 if (sentQueryIterator != null)
++                while (sentQueryIterator == null || !sentQueryIterator.hasNext())
                  {
-                     liveReturned += counter.counted();
-                     sentQueryIterator.close();
 -                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
 -                    float remainingRows = rowsToBeFetched - fetchedRows;
 -                    float actualRowsPerRange;
 -                    if (fetchedRows == 0.0)
 -                    {
 -                        // we haven't actually gotten any results, so query all remaining ranges at once
 -                        actualRowsPerRange = 0.0f;
 -                        concurrencyFactor = ranges.size() - i;
 -                    }
 -                    else
++                    // If we don't have more range to handle, we're done
++                    if (!ranges.hasNext())
++                        return endOfData();
 +
-                     // It's not the first batch of queries and we're not done, so we we can use what has been
-                     // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
-                     updateConcurrencyFactor();
++                    // else, sends the next batch of concurrent queries (after having close the previous iterator)
++                    if (sentQueryIterator != null)
+                     {
 -                        actualRowsPerRange = fetchedRows / i;
 -                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
++                        liveReturned += counter.counted();
++                        sentQueryIterator.close();
++
++                        // It's not the first batch of queries and we're not done, so we we can use what has been
++                        // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
++                        updateConcurrencyFactor();
+                     }
 -                    logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 -                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
++                    sentQueryIterator = sendNextRequests();
                  }
-                 sentQueryIterator = sendNextRequests();
-             }
 +
-             return sentQueryIterator.next();
++                return sentQueryIterator.next();
++            }
++            catch (UnavailableException e)
++            {
++                rangeMetrics.unavailables.mark();
++                throw e;
++            }
++            catch (ReadTimeoutException e)
++            {
++                rangeMetrics.timeouts.mark();
++                throw e;
++            }
++            catch (ReadFailureException e)
++            {
++                rangeMetrics.failures.mark();
++                throw e;
+             }
          }
 -        catch (ReadTimeoutException e)
 +
 +        private void updateConcurrencyFactor()
          {
 -            rangeMetrics.timeouts.mark();
 -            throw e;
 +            if (liveReturned == 0)
 +            {
 +                // we haven't actually gotten any results, so query all remaining ranges at once
 +                concurrencyFactor = totalRangeCount - rangesQueried;
 +                return;
 +            }
 +
 +            // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
 +            // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
 +            int remainingRows = command.limits().count() - liveReturned;
 +            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
 +            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
 +            logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 +                         rowsPerRange, (int) remainingRows, concurrencyFactor);
          }
 -        catch (UnavailableException e)
 +
 +        private SingleRangeResponse query(RangeForQuery toQuery)
          {
 -            rangeMetrics.unavailables.mark();
 -            throw e;
 +            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
 +
 +            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
 +
 +            int blockFor = consistency.blockFor(keyspace);
 +            int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
 +            List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
 +            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
 +
 +            handler.assureSufficientLiveNodes();
 +
 +            if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
 +            {
 +                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
 +            }
 +            else
 +            {
 +                for (InetAddress endpoint : toQuery.filteredEndpoints)
 +                {
 +                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
 +                    Tracing.trace("Enqueuing request to {}", endpoint);
 +                    MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 +                }
 +            }
 +
 +            return new SingleRangeResponse(handler);
          }
 -        catch (ReadFailureException e)
 +
 +        private PartitionIterator sendNextRequests()
          {
 -            rangeMetrics.failures.mark();
 -            throw e;
 +            List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
 +            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
 +            {
 +                concurrentQueries.add(query(ranges.next()));
 +                ++rangesQueried;
 +            }
 +
 +            Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
 +            // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
 +            // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
 +            counter = DataLimits.NONE.newCounter(command.nowInSec(), true);
 +            return counter.applyTo(PartitionIterators.concat(concurrentQueries));
          }
 -        finally
 +
 +        public void close()
          {
 -            long latency = System.nanoTime() - startTime;
 -            rangeMetrics.addNano(latency);
 -            Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            try
 +            {
 +                if (sentQueryIterator != null)
 +                    sentQueryIterator.close();
 +            }
 +            finally
 +            {
 +                long latency = System.nanoTime() - startTime;
 +                rangeMetrics.addNano(latency);
 +                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            }
          }
 -        return command.postReconciliationProcessing(rows);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
-     throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    {
 +        Tracing.trace("Computing ranges to query");
 +
 +        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
 +        RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
 +
 +        // our estimate of how many result rows there will be per-range
 +        float resultsPerRange = estimateResultsPerRange(command, keyspace);
 +        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 +        // fetch enough rows in the first round
 +        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 +        int concurrencyFactor = resultsPerRange == 0.0
 +                              ? 1
 +                              : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
 +        logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 +                     resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
 +        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
 +
 +        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
 +
 +        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
      }
  
      public Map<String, List<String>> getSchemaVersions()