You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/09 18:19:48 UTC

[1/2] cassandra git commit: Fix DISTINCT queries w/ limits/paging and tombstoned partitions

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 14b2d7a16 -> 7f62e2928


Fix DISTINCT queries w/ limits/paging and tombstoned partitions

Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8490


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd62f7bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd62f7bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd62f7bf

Branch: refs/heads/cassandra-2.1
Commit: dd62f7bf7977dd40eedb1c81ab7900b778f84540
Parents: ed54e80
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 9 11:14:54 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 11:14:54 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                            |  2 ++
 .../cassandra/cql3/statements/SelectStatement.java     |  6 +++++-
 .../org/apache/cassandra/db/AbstractRangeCommand.java  | 13 +++++++++++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java     |  4 +++-
 src/java/org/apache/cassandra/db/DataRange.java        | 12 ++++++++++++
 .../org/apache/cassandra/db/filter/ExtendedFilter.java |  6 ++++++
 .../apache/cassandra/db/filter/SliceQueryFilter.java   |  6 ++++++
 .../org/apache/cassandra/service/StorageProxy.java     | 13 +++++++------
 8 files changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index adb374a..0c7e9a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.12:
+ * Fix DISTINCT queries with LIMITs or paging when some partitions
+   contain only tombstones (CASSANDRA-8490)
  * Introduce background cache refreshing to permissions cache
    (CASSANDRA-8194)
  * Fix race condition in StreamTransferTask that could lead to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f08f6b8..19615b6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -450,7 +450,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             // For distinct, we only care about fetching the beginning of each partition. If we don't have
             // static columns, we in fact only care about the first cell, so we query only that (we don't "group").
             // If we do have static columns, we do need to fetch the first full group (to have the static columns values).
-            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selectsStaticColumns ? toGroup : -1);
+
+            // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for
+            // DISTINCT queries on the partition key only.
+            toGroup = selectsStaticColumns ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup);
         }
         else if (isColumnRange())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
index 45302e2..4ddcb8d 100644
--- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -57,6 +57,19 @@ public abstract class AbstractRangeCommand implements IReadCommand
 
     public abstract int limit();
     public abstract boolean countCQL3Rows();
+
+    /**
+     * Returns true if tombstoned partitions should not be included in results or count towards the limit.
+     * See CASSANDRA-8490 for more details on why this is needed (and done this way).
+     * */
+    public boolean ignoredTombstonedPartitions()
+    {
+        if (!(predicate instanceof SliceQueryFilter))
+            return false;
+
+        return ((SliceQueryFilter) predicate).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+    }
+
     public abstract List<Row> executeLocally();
 
     public long getTimeout()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7bd2a59..e936473 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1749,6 +1749,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         List<Row> rows = new ArrayList<Row>();
         int columnsCount = 0;
         int total = 0, matched = 0;
+        boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions();
 
         try
         {
@@ -1784,7 +1785,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 }
 
                 rows.add(new Row(rawRow.key, data));
-                matched++;
+                if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp))
+                    matched++;
 
                 if (data != null)
                     columnsCount += filter.lastCounted(data);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index b8b8daf..774a3aa 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -87,6 +87,18 @@ public class DataRange
         return keyRange.right;
     }
 
+    /**
+     * Returns true if tombstoned partitions should not be included in results or count towards the limit.
+     * See CASSANDRA-8490 for more details on why this is needed (and done this way).
+     * */
+    public boolean ignoredTombstonedPartitions()
+    {
+        if (!(columnFilter instanceof SliceQueryFilter))
+            return false;
+
+        return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+    }
+
     // Whether the bounds of this DataRange actually wraps around.
     public boolean isWrapAround()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 5c3662b..82e889d 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -127,6 +127,12 @@ public abstract class ExtendedFilter
      */
     public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
 
+    /** Returns true if tombstoned partitions should not be included in results or count towards the limit, false otherwise. */
+    public boolean ignoreTombstonedPartitions()
+    {
+        return dataRange.ignoredTombstonedPartitions();
+    }
+
     /**
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 58a0303..858578f 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -42,6 +42,12 @@ public class SliceQueryFilter implements IDiskAtomFilter
     private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
     public static final Serializer serializer = new Serializer();
 
+    /**
+     * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
+     * or count towards the limit.  See CASSANDRA-8490 for more details on why this is needed (and done this way).
+     **/
+    public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
+
     public final ColumnSlice[] slices;
     public final boolean reversed;
     public volatile int count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1e1a2a3..45af1c8 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1499,7 +1499,8 @@ public class StorageProxy implements StorageProxyMBean
         // now scan until we have enough results
         try
         {
-            int cql3RowCount = 0;
+            int liveRowCount = 0;
+            boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
             rows = new ArrayList<>();
 
             // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
@@ -1594,8 +1595,8 @@ public class StorageProxy implements StorageProxyMBean
                     for (Row row : handler.get())
                     {
                         rows.add(row);
-                        if (nodeCmd.countCQL3Rows())
-                            cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
+                        if (countLiveRows)
+                            liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }
@@ -1636,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // if we're done, great, otherwise, move to the next range
-                int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+                int count = countLiveRows ? liveRowCount : rows.size();
                 if (count >= nodeCmd.limit())
                     break;
             }
@@ -1652,8 +1653,8 @@ public class StorageProxy implements StorageProxyMBean
 
     private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
     {
-        // When maxIsColumns, we let the caller trim the result.
-        if (command.countCQL3Rows())
+        // for CQL3 queries, let the caller trim the results
+        if (command.countCQL3Rows() || command.ignoredTombstonedPartitions())
             return rows;
         else
             return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;


[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/service/StorageProxy.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f62e292
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f62e292
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f62e292

Branch: refs/heads/cassandra-2.1
Commit: 7f62e292867bb6159592bfc8b0423f89f518a2b5
Parents: 14b2d7a dd62f7b
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 9 11:19:37 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 11:19:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                          |  2 ++
 .../cassandra/cql3/statements/SelectStatement.java   |  6 +++++-
 .../apache/cassandra/db/AbstractRangeCommand.java    | 13 +++++++++++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java   |  4 +++-
 src/java/org/apache/cassandra/db/DataRange.java      | 12 ++++++++++++
 .../apache/cassandra/db/filter/ExtendedFilter.java   |  6 ++++++
 .../apache/cassandra/db/filter/SliceQueryFilter.java |  6 ++++++
 .../org/apache/cassandra/service/StorageProxy.java   | 15 ++++++++-------
 8 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2028633,0c7e9a2..abe3fce
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,6 +1,58 @@@
 -2.0.12:
 +2.1.3
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
+  * Fix DISTINCT queries with LIMITs or paging when some partitions
+    contain only tombstones (CASSANDRA-8490)
   * Introduce background cache refreshing to permissions cache
     (CASSANDRA-8194)
   * Fix race condition in StreamTransferTask that could lead to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 487c6f0,858578f..7508c4e
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@@ -44,7 -40,14 +44,13 @@@ import org.apache.cassandra.tracing.Tra
  public class SliceQueryFilter implements IDiskAtomFilter
  {
      private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
 -    public static final Serializer serializer = new Serializer();
  
+     /**
+      * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
+      * or count towards the limit.  See CASSANDRA-8490 for more details on why this is needed (and done this way).
+      **/
+     public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
+ 
      public final ColumnSlice[] slices;
      public final boolean reversed;
      public volatile int count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 780df21,45af1c8..72577a6
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1631,141 -1517,109 +1632,141 @@@ public class StorageProxy implements St
              List<InetAddress> nextFilteredEndpoints = null;
              while (i < ranges.size())
              {
 -                AbstractBounds<RowPosition> range = nextRange == null
 -                                                  ? ranges.get(i)
 -                                                  : nextRange;
 -                List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                : nextEndpoints;
 -                List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                    ? consistency_level.filterForQuery(keyspace, liveEndpoints)
 -                                                    : nextFilteredEndpoints;
 -                ++i;
 -
 -                // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
 -                // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
 -                // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
 -                while (i < ranges.size())
 +                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
 +                int concurrentFetchStartingIndex = i;
 +                int concurrentRequests = 0;
 +                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
                  {
 -                    nextRange = ranges.get(i);
 -                    nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                    nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                    /*
 -                     * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
 -                     * don't know how to deal with a wrapping range.
 -                     * Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
 -                     * the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
 -                     * wire compatibility, so It's likely easier not to bother;
 -                     */
 -                    if (range.right.isMinimum())
 -                        break;
 -
 -                    List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                    // Check if there is enough endpoint for the merge to be possible.
 -                    if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                        break;
 -
 -                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 -
 -                    // Estimate whether merging will be a win or not
 -                    if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
 -                        break;
 -
 -                    // If we get there, merge this range and the next one
 -                    range = range.withNewRight(nextRange.right);
 -                    liveEndpoints = merged;
 -                    filteredEndpoints = filteredMerged;
 +                    AbstractBounds<RowPosition> range = nextRange == null
 +                                                      ? ranges.get(i)
 +                                                      : nextRange;
 +                    List<InetAddress> liveEndpoints = nextEndpoints == null
 +                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 +                                                    : nextEndpoints;
 +                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 +                                                        ? consistency_level.filterForQuery(keyspace, liveEndpoints)
 +                                                        : nextFilteredEndpoints;
                      ++i;
 -                }
 +                    ++concurrentRequests;
  
 -                AbstractRangeCommand nodeCmd = command.forSubRange(range);
 -
 -                // collect replies and resolve according to consistency level
 -                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 -                List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 -                handler.assureSufficientLiveNodes();
 -                resolver.setSources(filteredEndpoints);
 -                if (filteredEndpoints.size() == 1
 -                    && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 -                    && OPTIMIZE_LOCAL_REQUESTS)
 -                {
 -                    StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
 -                }
 -                else
 -                {
 -                    MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                    for (InetAddress endpoint : filteredEndpoints)
 +                    // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
 +                    // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
 +                    // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
 +                    while (i < ranges.size())
                      {
 -                        Tracing.trace("Enqueuing request to {}", endpoint);
 -                        MessagingService.instance().sendRR(message, endpoint, handler);
 +                        nextRange = ranges.get(i);
 +                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 +                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 +
 +                        // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
 +                        // don't know how to deal with a wrapping range.
 +                        // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
 +                        // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
 +                        // wire compatibility, so It's likely easier not to bother;
 +                        if (range.right.isMinimum())
 +                            break;
 +
 +                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 +
 +                        // Check if there is enough endpoint for the merge to be possible.
 +                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 +                            break;
 +
 +                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 +
 +                        // Estimate whether merging will be a win or not
 +                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
 +                            break;
 +
 +                        // If we get there, merge this range and the next one
 +                        range = range.withNewRight(nextRange.right);
 +                        liveEndpoints = merged;
 +                        filteredEndpoints = filteredMerged;
 +                        ++i;
                      }
 -                }
  
 -                try
 -                {
 -                    for (Row row : handler.get())
 +                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +
 +                    // collect replies and resolve according to consistency level
 +                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 +                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 +                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 +                    handler.assureSufficientLiveNodes();
 +                    resolver.setSources(filteredEndpoints);
 +                    if (filteredEndpoints.size() == 1
 +                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 +                        && OPTIMIZE_LOCAL_REQUESTS)
                      {
 -                        rows.add(row);
 -                        if (countLiveRows)
 -                            liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
                      }
 -                    FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 +                    else
 +                    {
 +                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 +                        for (InetAddress endpoint : filteredEndpoints)
 +                        {
 +                            Tracing.trace("Enqueuing request to {}", endpoint);
 +                            MessagingService.instance().sendRR(message, endpoint, handler);
 +                        }
 +                    }
 +                    scanHandlers.add(Pair.create(nodeCmd, handler));
                  }
 -                catch (ReadTimeoutException ex)
 +                Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
 +
 +                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 +                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
                  {
 -                    // we timed out waiting for responses
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    int responseCount = resolver.responses.size();
 -                    String gotData = responseCount > 0
 -                                     ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
 -                                     : "";
 +                    AbstractRangeCommand nodeCmd = cmdPairHandler.left;
 +                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 +                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
  
 -                    if (Tracing.isTracing())
 +                    try
                      {
 -                        Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
 -                                new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
 +                        for (Row row : handler.get())
 +                        {
 +                            rows.add(row);
-                             if (nodeCmd.countCQL3Rows())
-                                 cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
++                            if (countLiveRows)
++                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        }
 +                        repairResponses.addAll(resolver.repairResults);
                      }
 -                    else if (logger.isDebugEnabled())
 +                    catch (ReadTimeoutException ex)
                      {
 -                        logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
 -                                responseCount, blockFor, gotData, i, ranges.size());
 +                        // we timed out waiting for responses
 +                        int blockFor = consistency_level.blockFor(keyspace);
 +                        int responseCount = resolver.responses.size();
 +                        String gotData = responseCount > 0
 +                                         ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
 +                                         : "";
 +
 +                        if (Tracing.isTracing())
 +                        {
 +                            Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
 +                                          new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
 +                        }
 +                        else if (logger.isDebugEnabled())
 +                        {
 +                            logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
 +                                         responseCount, blockFor, gotData, i, ranges.size());
 +                        }
 +                        throw ex;
                      }
 -                    throw ex;
 +                    catch (DigestMismatchException e)
 +                    {
 +                        throw new AssertionError(e); // no digests in range slices yet
 +                    }
 +
 +                    // if we're done, great, otherwise, move to the next range
-                     int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
++                    int count = countLiveRows ? liveRowCount : rows.size();
 +                    if (count >= nodeCmd.limit())
 +                    {
 +                        haveSufficientRows = true;
 +                        break;
 +                    }
 +                }
 +
 +                try
 +                {
 +                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
                  }
                  catch (TimeoutException ex)
                  {
@@@ -1777,31 -1631,15 +1778,31 @@@
                          logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
                      throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
                  }
 -                catch (DigestMismatchException e)
 +
 +                if (haveSufficientRows)
 +                    return trim(command, rows);
 +
 +                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
 +                // based on the results we've seen so far (as long as we still have ranges left to query)
 +                if (i < ranges.size())
                  {
-                     float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size();
 -                    throw new AssertionError(e); // no digests in range slices yet
++                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
 +                    float remainingRows = command.limit() - fetchedRows;
 +                    float actualRowsPerRange;
 +                    if (fetchedRows == 0.0)
 +                    {
 +                        // we haven't actually gotten any results, so query all remaining ranges at once
 +                        actualRowsPerRange = 0.0f;
 +                        concurrencyFactor = ranges.size() - i;
 +                    }
 +                    else
 +                    {
 +                        actualRowsPerRange = i / fetchedRows;
 +                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
 +                    }
 +                    logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 +                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
                  }
 -
 -                // if we're done, great, otherwise, move to the next range
 -                int count = countLiveRows ? liveRowCount : rows.size();
 -                if (count >= nodeCmd.limit())
 -                    break;
              }
          }
          finally