You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/19 19:02:44 UTC

cassandra git commit: Fix DISTINCT on static columns with paging

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 351e35b64 -> 2acbab651


Fix DISTINCT on static columns with paging

Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8087


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2acbab65
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2acbab65
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2acbab65

Branch: refs/heads/cassandra-2.0
Commit: 2acbab6516fc6c8666d3bbff99c49f04b8499235
Parents: 351e35b
Author: Tyler Hobbs <ty...@apache.org>
Authored: Fri Dec 19 12:02:00 2014 -0600
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Fri Dec 19 12:02:00 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../cql3/statements/ColumnGroupMap.java         | 27 ++++++++++++++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  6 ++--
 src/java/org/apache/cassandra/db/DataRange.java | 30 ++++++++++++++------
 .../apache/cassandra/db/PagedRangeCommand.java  | 13 +++------
 .../cassandra/db/filter/SliceQueryFilter.java   |  6 +++-
 6 files changed, 64 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ad2de5..bd128f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.12:
+ * Fix non-distinct results in DISTNCT queries on static columns when
+   paging is enabled (CASSANDRA-8087)
  * Move all hints related tasks to hints internal executor (CASSANDRA-8285)
  * Fix paging for multi-partition IN queries (CASSANDRA-8408)
  * Fix MOVED_NODE topology event never being emitted when a node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 58428ed..2fb9c5f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -24,10 +24,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Objects;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang3.StringUtils;
 
 public class ColumnGroupMap
 {
@@ -96,6 +99,24 @@ public class ColumnGroupMap
         return map.containsKey(key);
     }
 
+    @Override
+    public String toString()
+    {
+        List<String> fullStringPath = new ArrayList<>(fullPath.length);
+        for (ByteBuffer buffer : fullPath)
+            fullStringPath.add(ByteBufferUtil.bytesToHex(buffer));
+
+        List<String> stringMap = new ArrayList<>(fullPath.length);
+        for (Map.Entry<ByteBuffer, Value> entry : map.entrySet())
+            stringMap.add(ByteBufferUtil.bytesToHex(entry.getKey()) + ": " + entry.getValue());
+
+        return Objects.toStringHelper(this)
+                .add("fullPath", "[" + StringUtils.join(fullStringPath, ", ") + "]")
+                .add("map", "{" + StringUtils.join(stringMap, ", ") + "}")
+                .add("isStatic", isStatic)
+                .toString();
+    }
+
     private interface Value {};
 
     private static class Simple implements Value
@@ -106,6 +127,12 @@ public class ColumnGroupMap
         {
             this.column = column;
         }
+
+        @Override
+        public String toString()
+        {
+            return Objects.toStringHelper(this).add("column", ByteBufferUtil.bytesToHex(column.name())).toString();
+        }
     }
 
     private static class Collection extends ArrayList<Pair<ByteBuffer, Column>> implements Value {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0e7e5f2..7bd2a59 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1649,12 +1649,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * Allows generic range paging with the slice column filter.
      * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
-     * And suppose we want to page throught the query that for all rows returns the columns
+     * And suppose we want to page through the query that for all rows returns the columns
      * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
      * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
      * That is what this method allows. The columnRange is the "window" of  columns we are interested
      * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
-     * (resp. end) requested row.
+     * (resp. last) requested row.
      */
     public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
                                              SliceQueryFilter columnRange,
@@ -1694,6 +1694,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             assert columnFilter instanceof SliceQueryFilter;
             SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
             assert sfilter.slices.length == 1;
+            // create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish
+            // through to DataRange.Paging to be used on the first and last partitions
             SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
             dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index b8e0bf5..b8b8daf 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -111,11 +111,19 @@ public class DataRange
         return selectFullRow;
     }
 
+    /**
+     * Returns a column filter that should be used for a particular row key.  Note that in the case of paging,
+     * slice starts and ends may change depending on the row key.
+     */
     public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
     {
         return columnFilter;
     }
 
+    /**
+     * Sets a new limit on the number of (grouped) cells to fetch. This is currently only used when the query limit applies
+     * to CQL3 rows.
+     */
     public void updateColumnsLimit(int count)
     {
         columnFilter.updateColumnsLimit(count);
@@ -123,12 +131,18 @@ public class DataRange
 
     public static class Paging extends DataRange
     {
+        // The slice of columns that we want to fetch for each row, ignoring page start/end issues.
         private final SliceQueryFilter sliceFilter;
+
         private final Comparator<ByteBuffer> comparator;
-        private final ByteBuffer columnStart;
-        private final ByteBuffer columnFinish;
 
-        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+        // used to restrict the start of the slice for the first partition in the range
+        private final ByteBuffer firstPartitionColumnStart;
+
+        // used to restrict the end of the slice for the last partition in the range
+        private final ByteBuffer lastPartitionColumnFinish;
+
+        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart, ByteBuffer lastPartitionColumnFinish, Comparator<ByteBuffer> comparator)
         {
             super(range, filter);
 
@@ -138,8 +152,8 @@ public class DataRange
 
             this.sliceFilter = filter;
             this.comparator = comparator;
-            this.columnStart = columnStart;
-            this.columnFinish = columnFinish;
+            this.firstPartitionColumnStart = firstPartitionColumnStart;
+            this.lastPartitionColumnFinish = lastPartitionColumnFinish;
         }
 
         public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
@@ -184,11 +198,11 @@ public class DataRange
         private ColumnSlice[] slicesForKey(ByteBuffer key)
         {
             // We don't call that until it's necessary, so assume we have to do some hard work
-            // Also note that columnStart and columnFinish, when used, only "restrict" the filter slices,
+            // Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices,
             // it doesn't expand on them. As such, we can ignore the case where they are empty and we do
             // as it screw up with the logic below (see #6592)
-            ByteBuffer newStart = equals(startKey(), key) && columnStart.hasRemaining() ? columnStart : null;
-            ByteBuffer newFinish = equals(stopKey(), key) && columnFinish.hasRemaining() ? columnFinish : null;
+            ByteBuffer newStart = equals(startKey(), key) && firstPartitionColumnStart.hasRemaining() ? firstPartitionColumnStart : null;
+            ByteBuffer newFinish = equals(stopKey(), key) && lastPartitionColumnFinish.hasRemaining() ? lastPartitionColumnFinish : null;
 
             List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index d6f3ca1..a8d7f49 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -97,15 +97,10 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
     public boolean countCQL3Rows()
     {
-        // We only use PagedRangeCommand for CQL3. However, for SELECT DISTINCT, we want to return false here, because
-        // we just want to pick the first cell of each partition and returning true here would throw off the logic in
-        // ColumnFamilyStore.filter().
-        // What we do know is that for a SELECT DISTINCT the underlying SliceQueryFilter will have a compositesToGroup==-1
-        // and a count==1. And while it would be possible for a normal SELECT on a COMPACT table to also have such
-        // parameters, it's fine returning false since if we do count one cell for each partition, then each partition
-        // will coincide with exactly one CQL3 row.
-        SliceQueryFilter filter = (SliceQueryFilter)predicate;
-        return filter.compositesToGroup >= 0 || filter.count != 1;
+        // For CQL3 queries, unless this is a DISTINCT query, the slice filter count is the LIMIT of the query.
+        // We don't page queries in the first place if their LIMIT <= pageSize and so we'll never page a query with
+        // a limit of 1. See CASSANDRA-8087 for more details.
+        return ((SliceQueryFilter)predicate).count != 1;
     }
 
     public List<Row> executeLocally()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2acbab65/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index e0ccc2f..58a0303 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -292,7 +292,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public int lastCounted()
     {
-        return columnCounter == null ? 0 : columnCounter.live();
+        // If we have a slice limit set, columnCounter.live() can overcount by one because we have to call
+        // columnCounter.count() before we can tell if we've exceeded the slice limit (and accordingly, should not
+        // add the cells to returned container).  To deal with this overcounting, we take the min of the slice
+        // limit and the counter's count.
+        return columnCounter == null ? 0 : Math.min(columnCounter.live(), count);
     }
 
     public int lastIgnored()