You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/11/26 17:53:09 UTC

[5/7] cassandra git commit: Fix SELECT statement with IN restrictions on partition key, ORDER BY and LIMIT

Fix SELECT statement with IN restrictions on partition key, ORDER BY and LIMIT

patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-10729


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e6c1d54
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e6c1d54
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e6c1d54

Branch: refs/heads/trunk
Commit: 7e6c1d5483b35ab911113dff0f5fd559760d733b
Parents: f4dab0f
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Nov 26 17:40:33 2015 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Nov 26 17:40:33 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 NEWS.txt                                        |  9 ++
 .../cql3/statements/SelectStatement.java        | 91 ++++++++++++++------
 .../apache/cassandra/db/filter/DataLimits.java  | 20 +++--
 .../operations/SelectOrderByTest.java           | 41 +++++++++
 5 files changed, 127 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eab04fb..e1a959a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0.1
+ * Fix SELECT statement with IN restrictions on partition key,
+   ORDER BY and LIMIT (CASSANDRA-10729)
  * Improve stress performance over 1k threads (CASSANDRA-7217)
  * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
  * Unable to create a function with argument of type Inet (CASSANDRA-10741)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d971d5e..02a9525 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.0.1
+=====
+
+Upgrading
+---------
+   - The return value of SelectStatement::getLimit as been changed from DataLimits
+     to int.
+
+
 3.0
 ===
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index ab1da45..a9bb121 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -202,15 +202,16 @@ public class SelectStatement implements CQLStatement
         cl.validateForRead(keyspace());
 
         int nowInSec = FBUtilities.nowInSeconds();
-        ReadQuery query = getQuery(options, nowInSec);
+        int userLimit = getLimit(options);
+        ReadQuery query = getQuery(options, nowInSec, userLimit);
 
         int pageSize = getPageSize(options);
 
         if (pageSize <= 0 || query.limits().count() <= pageSize)
-            return execute(query, options, state, nowInSec);
+            return execute(query, options, state, nowInSec, userLimit);
 
         QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
-        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
+        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit);
     }
 
     private int getPageSize(QueryOptions options)
@@ -228,18 +229,27 @@ public class SelectStatement implements CQLStatement
 
     public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
     {
-        DataLimits limit = getLimit(options);
+        return getQuery(options, nowInSec, getLimit(options));
+    }
+
+    public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit) throws RequestValidationException
+    {
+        DataLimits limit = getDataLimits(userLimit);
         if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
             return getRangeCommand(options, limit, nowInSec);
 
         return getSliceCommands(options, limit, nowInSec);
     }
 
-    private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows execute(ReadQuery query,
+                                       QueryOptions options,
+                                       QueryState state,
+                                       int nowInSec,
+                                       int userLimit) throws RequestValidationException, RequestExecutionException
     {
         try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState()))
         {
-            return processResults(data, options, nowInSec);
+            return processResults(data, options, nowInSec, userLimit);
         }
     }
 
@@ -310,8 +320,11 @@ public class SelectStatement implements CQLStatement
         }
     }
 
-    private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec)
-    throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows execute(Pager pager,
+                                       QueryOptions options,
+                                       int pageSize,
+                                       int nowInSec,
+                                       int userLimit) throws RequestValidationException, RequestExecutionException
     {
         if (selection.isAggregate())
             return pageAggregateQuery(pager, options, pageSize, nowInSec);
@@ -324,7 +337,7 @@ public class SelectStatement implements CQLStatement
         ResultMessage.Rows msg;
         try (PartitionIterator page = pager.fetchPage(pageSize))
         {
-            msg = processResults(page, options, nowInSec);
+            msg = processResults(page, options, nowInSec, userLimit);
         }
 
         // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
@@ -366,16 +379,20 @@ public class SelectStatement implements CQLStatement
         return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
     }
 
-    private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException
+    private ResultMessage.Rows processResults(PartitionIterator partitions,
+                                              QueryOptions options,
+                                              int nowInSec,
+                                              int userLimit) throws RequestValidationException
     {
-        ResultSet rset = process(partitions, options, nowInSec);
+        ResultSet rset = process(partitions, options, nowInSec, userLimit);
         return new ResultMessage.Rows(rset);
     }
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         int nowInSec = FBUtilities.nowInSeconds();
-        ReadQuery query = getQuery(options, nowInSec);
+        int userLimit = getLimit(options);
+        ReadQuery query = getQuery(options, nowInSec, userLimit);
         int pageSize = getPageSize(options);
 
         try (ReadOrderGroup orderGroup = query.startOrderGroup())
@@ -384,20 +401,20 @@ public class SelectStatement implements CQLStatement
             {
                 try (PartitionIterator data = query.executeInternal(orderGroup))
                 {
-                    return processResults(data, options, nowInSec);
+                    return processResults(data, options, nowInSec, userLimit);
                 }
             }
             else
             {
                 QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
-                return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
+                return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit);
             }
         }
     }
 
     public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
     {
-        return process(partitions, QueryOptions.DEFAULT, nowInSec);
+        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
     }
 
     public String keyspace()
@@ -549,18 +566,37 @@ public class SelectStatement implements CQLStatement
         return builder.build();
     }
 
-    /**
-     * May be used by custom QueryHandler implementations
-     */
-    public DataLimits getLimit(QueryOptions options) throws InvalidRequestException
+    private DataLimits getDataLimits(int userLimit)
     {
-        int userLimit = -1;
+        int cqlRowLimit = DataLimits.NO_LIMIT;
+
         // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and
         // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever
         // return 1 result and can therefore basically ignore the user LIMIT in this case.
         // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus
         // able to apply the user limit properly.
-        if (limit != null && !selection.isAggregate())
+        // If we do post ordering we need to get all the results sorted before we can trim them.
+        if (!selection.isAggregate() && !needsPostQueryOrdering())
+            cqlRowLimit = userLimit;
+
+        if (parameters.isDistinct)
+            return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit);
+
+        return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.NONE : DataLimits.cqlLimits(cqlRowLimit);
+    }
+
+    /**
+     * Returns the limit specified by the user.
+     * May be used by custom QueryHandler implementations
+     *
+     * @return the limit specified by the user or <code>DataLimits.NO_LIMIT</code> if no value 
+     * as been specified.
+     */
+    public int getLimit(QueryOptions options)
+    {
+        int userLimit = DataLimits.NO_LIMIT;
+
+        if (limit != null)
         {
             ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
             // treat UNSET limit value as 'unlimited'
@@ -578,11 +614,7 @@ public class SelectStatement implements CQLStatement
                 }
             }
         }
-
-        if (parameters.isDistinct)
-            return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit);
-
-        return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit);
+        return userLimit;
     }
 
     private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException
@@ -604,7 +636,10 @@ public class SelectStatement implements CQLStatement
         return filter;
     }
 
-    private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException
+    private ResultSet process(PartitionIterator partitions,
+                              QueryOptions options,
+                              int nowInSec,
+                              int userLimit) throws InvalidRequestException
     {
         Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
         while (partitions.hasNext())
@@ -619,6 +654,8 @@ public class SelectStatement implements CQLStatement
 
         orderResults(cqlRows);
 
+        cqlRows.trim(userLimit);
+
         return cqlRows;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 130c6ba..19f24ad 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -41,7 +41,9 @@ public abstract class DataLimits
 {
     public static final Serializer serializer = new Serializer();
 
-    public static final DataLimits NONE = new CQLLimits(Integer.MAX_VALUE)
+    public static final int NO_LIMIT = Integer.MAX_VALUE;
+
+    public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
     {
         @Override
         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
@@ -64,7 +66,7 @@ public abstract class DataLimits
 
     // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per
     // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
-    public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true);
+    public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
 
     public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
 
@@ -236,7 +238,7 @@ public abstract class DataLimits
 
         private CQLLimits(int rowLimit)
         {
-            this(rowLimit, Integer.MAX_VALUE);
+            this(rowLimit, NO_LIMIT);
         }
 
         private CQLLimits(int rowLimit, int perPartitionLimit)
@@ -263,7 +265,7 @@ public abstract class DataLimits
 
         public boolean isUnlimited()
         {
-            return rowLimit == Integer.MAX_VALUE && perPartitionLimit == Integer.MAX_VALUE;
+            return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
         }
 
         public DataLimits forPaging(int pageSize)
@@ -281,7 +283,7 @@ public abstract class DataLimits
             // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So
             // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch
             // for both argument or just for perPartitionLimit with no limit on rowLimit).
-            return new CQLLimits(toFetch, Integer.MAX_VALUE, isDistinct);
+            return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
         }
 
         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
@@ -410,14 +412,14 @@ public abstract class DataLimits
         {
             StringBuilder sb = new StringBuilder();
 
-            if (rowLimit != Integer.MAX_VALUE)
+            if (rowLimit != NO_LIMIT)
             {
                 sb.append("LIMIT ").append(rowLimit);
-                if (perPartitionLimit != Integer.MAX_VALUE)
+                if (perPartitionLimit != NO_LIMIT)
                     sb.append(' ');
             }
 
-            if (perPartitionLimit != Integer.MAX_VALUE)
+            if (perPartitionLimit != NO_LIMIT)
                 sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
 
             return sb.toString();
@@ -508,7 +510,7 @@ public abstract class DataLimits
 
         public boolean isUnlimited()
         {
-            return partitionLimit == Integer.MAX_VALUE && cellPerPartitionLimit == Integer.MAX_VALUE;
+            return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
         }
 
         public DataLimits forPaging(int pageSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
index 73bbaca..ae6f772 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
@@ -333,6 +333,12 @@ public class SelectOrderByTest extends CQLTester
         assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"),
                    row(1), row(2), row(3));
 
+        assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 2"),
+                   row(1), row(2));
+
+        assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 10"),
+                   row(1), row(2), row(3));
+
         assertRows(execute("SELECT col1, my_id FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"),
                    row(1, "key1"), row(2, "key3"), row(3, "key2"));
 
@@ -360,6 +366,15 @@ public class SelectOrderByTest extends CQLTester
                    row("A"),
                    row("D"));
 
+        assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 2; ", 1, 1, 2),
+                   row("B"),
+                   row("A"));
+
+        assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 10; ", 1, 1, 2),
+                   row("B"),
+                   row("A"),
+                   row("D"));
+
         assertRows(execute("SELECT v as c FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c; ", 1, 1, 2),
                    row("B"),
                    row("A"),
@@ -390,6 +405,32 @@ public class SelectOrderByTest extends CQLTester
                    row("B"),
                    row("D"),
                    row("A"));
+
+        assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 2; ", 1, 1, 2),
+                   row("B"),
+                   row("D"));
+
+        assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 10; ", 1, 1, 2),
+                   row("B"),
+                   row("D"),
+                   row("A"));
+
+        assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC; ", 1, 1, 2),
+                   row("A"),
+                   row("D"),
+                   row("B"));
+
+        assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 2; ", 1, 1, 2),
+                   row("A"),
+                   row("D"));
+
+        assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 10; ", 1, 1, 2),
+                   row("A"),
+                   row("D"),
+                   row("B"));
+
+        assertInvalidMessage("LIMIT must be strictly positive",
+                             "SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 0; ", 1, 1, 2);
     }
 
     /**