You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/11/26 17:53:09 UTC
[5/7] cassandra git commit: Fix SELECT statement with IN restrictions
on partition key, ORDER BY and LIMIT
Fix SELECT statement with IN restrictions on partition key, ORDER BY and LIMIT
patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-10729
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e6c1d54
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e6c1d54
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e6c1d54
Branch: refs/heads/trunk
Commit: 7e6c1d5483b35ab911113dff0f5fd559760d733b
Parents: f4dab0f
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Nov 26 17:40:33 2015 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Nov 26 17:40:33 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 9 ++
.../cql3/statements/SelectStatement.java | 91 ++++++++++++++------
.../apache/cassandra/db/filter/DataLimits.java | 20 +++--
.../operations/SelectOrderByTest.java | 41 +++++++++
5 files changed, 127 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eab04fb..e1a959a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.0.1
+ * Fix SELECT statement with IN restrictions on partition key,
+ ORDER BY and LIMIT (CASSANDRA-10729)
* Improve stress performance over 1k threads (CASSANDRA-7217)
* Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
* Unable to create a function with argument of type Inet (CASSANDRA-10741)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d971d5e..02a9525 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+3.0.1
+=====
+
+Upgrading
+---------
+ - The return value of SelectStatement::getLimit as been changed from DataLimits
+ to int.
+
+
3.0
===
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index ab1da45..a9bb121 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -202,15 +202,16 @@ public class SelectStatement implements CQLStatement
cl.validateForRead(keyspace());
int nowInSec = FBUtilities.nowInSeconds();
- ReadQuery query = getQuery(options, nowInSec);
+ int userLimit = getLimit(options);
+ ReadQuery query = getQuery(options, nowInSec, userLimit);
int pageSize = getPageSize(options);
if (pageSize <= 0 || query.limits().count() <= pageSize)
- return execute(query, options, state, nowInSec);
+ return execute(query, options, state, nowInSec, userLimit);
QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
- return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
+ return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit);
}
private int getPageSize(QueryOptions options)
@@ -228,18 +229,27 @@ public class SelectStatement implements CQLStatement
public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
{
- DataLimits limit = getLimit(options);
+ return getQuery(options, nowInSec, getLimit(options));
+ }
+
+ public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit) throws RequestValidationException
+ {
+ DataLimits limit = getDataLimits(userLimit);
if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
return getRangeCommand(options, limit, nowInSec);
return getSliceCommands(options, limit, nowInSec);
}
- private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows execute(ReadQuery query,
+ QueryOptions options,
+ QueryState state,
+ int nowInSec,
+ int userLimit) throws RequestValidationException, RequestExecutionException
{
try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState()))
{
- return processResults(data, options, nowInSec);
+ return processResults(data, options, nowInSec, userLimit);
}
}
@@ -310,8 +320,11 @@ public class SelectStatement implements CQLStatement
}
}
- private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec)
- throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows execute(Pager pager,
+ QueryOptions options,
+ int pageSize,
+ int nowInSec,
+ int userLimit) throws RequestValidationException, RequestExecutionException
{
if (selection.isAggregate())
return pageAggregateQuery(pager, options, pageSize, nowInSec);
@@ -324,7 +337,7 @@ public class SelectStatement implements CQLStatement
ResultMessage.Rows msg;
try (PartitionIterator page = pager.fetchPage(pageSize))
{
- msg = processResults(page, options, nowInSec);
+ msg = processResults(page, options, nowInSec, userLimit);
}
// Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
@@ -366,16 +379,20 @@ public class SelectStatement implements CQLStatement
return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
}
- private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException
+ private ResultMessage.Rows processResults(PartitionIterator partitions,
+ QueryOptions options,
+ int nowInSec,
+ int userLimit) throws RequestValidationException
{
- ResultSet rset = process(partitions, options, nowInSec);
+ ResultSet rset = process(partitions, options, nowInSec, userLimit);
return new ResultMessage.Rows(rset);
}
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
int nowInSec = FBUtilities.nowInSeconds();
- ReadQuery query = getQuery(options, nowInSec);
+ int userLimit = getLimit(options);
+ ReadQuery query = getQuery(options, nowInSec, userLimit);
int pageSize = getPageSize(options);
try (ReadOrderGroup orderGroup = query.startOrderGroup())
@@ -384,20 +401,20 @@ public class SelectStatement implements CQLStatement
{
try (PartitionIterator data = query.executeInternal(orderGroup))
{
- return processResults(data, options, nowInSec);
+ return processResults(data, options, nowInSec, userLimit);
}
}
else
{
QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
- return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
+ return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit);
}
}
}
public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
{
- return process(partitions, QueryOptions.DEFAULT, nowInSec);
+ return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
}
public String keyspace()
@@ -549,18 +566,37 @@ public class SelectStatement implements CQLStatement
return builder.build();
}
- /**
- * May be used by custom QueryHandler implementations
- */
- public DataLimits getLimit(QueryOptions options) throws InvalidRequestException
+ private DataLimits getDataLimits(int userLimit)
{
- int userLimit = -1;
+ int cqlRowLimit = DataLimits.NO_LIMIT;
+
// If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and
// since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever
// return 1 result and can therefore basically ignore the user LIMIT in this case.
// Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus
// able to apply the user limit properly.
- if (limit != null && !selection.isAggregate())
+ // If we do post ordering we need to get all the results sorted before we can trim them.
+ if (!selection.isAggregate() && !needsPostQueryOrdering())
+ cqlRowLimit = userLimit;
+
+ if (parameters.isDistinct)
+ return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit);
+
+ return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.NONE : DataLimits.cqlLimits(cqlRowLimit);
+ }
+
+ /**
+ * Returns the limit specified by the user.
+ * May be used by custom QueryHandler implementations
+ *
+ * @return the limit specified by the user or <code>DataLimits.NO_LIMIT</code> if no value
+ * as been specified.
+ */
+ public int getLimit(QueryOptions options)
+ {
+ int userLimit = DataLimits.NO_LIMIT;
+
+ if (limit != null)
{
ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
// treat UNSET limit value as 'unlimited'
@@ -578,11 +614,7 @@ public class SelectStatement implements CQLStatement
}
}
}
-
- if (parameters.isDistinct)
- return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit);
-
- return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit);
+ return userLimit;
}
private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException
@@ -604,7 +636,10 @@ public class SelectStatement implements CQLStatement
return filter;
}
- private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException
+ private ResultSet process(PartitionIterator partitions,
+ QueryOptions options,
+ int nowInSec,
+ int userLimit) throws InvalidRequestException
{
Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
while (partitions.hasNext())
@@ -619,6 +654,8 @@ public class SelectStatement implements CQLStatement
orderResults(cqlRows);
+ cqlRows.trim(userLimit);
+
return cqlRows;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 130c6ba..19f24ad 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -41,7 +41,9 @@ public abstract class DataLimits
{
public static final Serializer serializer = new Serializer();
- public static final DataLimits NONE = new CQLLimits(Integer.MAX_VALUE)
+ public static final int NO_LIMIT = Integer.MAX_VALUE;
+
+ public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
{
@Override
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
@@ -64,7 +66,7 @@ public abstract class DataLimits
// We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per
// partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
- public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true);
+ public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
@@ -236,7 +238,7 @@ public abstract class DataLimits
private CQLLimits(int rowLimit)
{
- this(rowLimit, Integer.MAX_VALUE);
+ this(rowLimit, NO_LIMIT);
}
private CQLLimits(int rowLimit, int perPartitionLimit)
@@ -263,7 +265,7 @@ public abstract class DataLimits
public boolean isUnlimited()
{
- return rowLimit == Integer.MAX_VALUE && perPartitionLimit == Integer.MAX_VALUE;
+ return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
}
public DataLimits forPaging(int pageSize)
@@ -281,7 +283,7 @@ public abstract class DataLimits
// When we do a short read retry, we're only ever querying the single partition on which we have a short read. So
// we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch
// for both argument or just for perPartitionLimit with no limit on rowLimit).
- return new CQLLimits(toFetch, Integer.MAX_VALUE, isDistinct);
+ return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
}
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec)
@@ -410,14 +412,14 @@ public abstract class DataLimits
{
StringBuilder sb = new StringBuilder();
- if (rowLimit != Integer.MAX_VALUE)
+ if (rowLimit != NO_LIMIT)
{
sb.append("LIMIT ").append(rowLimit);
- if (perPartitionLimit != Integer.MAX_VALUE)
+ if (perPartitionLimit != NO_LIMIT)
sb.append(' ');
}
- if (perPartitionLimit != Integer.MAX_VALUE)
+ if (perPartitionLimit != NO_LIMIT)
sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
return sb.toString();
@@ -508,7 +510,7 @@ public abstract class DataLimits
public boolean isUnlimited()
{
- return partitionLimit == Integer.MAX_VALUE && cellPerPartitionLimit == Integer.MAX_VALUE;
+ return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
}
public DataLimits forPaging(int pageSize)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
index 73bbaca..ae6f772 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
@@ -333,6 +333,12 @@ public class SelectOrderByTest extends CQLTester
assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"),
row(1), row(2), row(3));
+ assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 2"),
+ row(1), row(2));
+
+ assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 10"),
+ row(1), row(2), row(3));
+
assertRows(execute("SELECT col1, my_id FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"),
row(1, "key1"), row(2, "key3"), row(3, "key2"));
@@ -360,6 +366,15 @@ public class SelectOrderByTest extends CQLTester
row("A"),
row("D"));
+ assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 2; ", 1, 1, 2),
+ row("B"),
+ row("A"));
+
+ assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 10; ", 1, 1, 2),
+ row("B"),
+ row("A"),
+ row("D"));
+
assertRows(execute("SELECT v as c FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c; ", 1, 1, 2),
row("B"),
row("A"),
@@ -390,6 +405,32 @@ public class SelectOrderByTest extends CQLTester
row("B"),
row("D"),
row("A"));
+
+ assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 2; ", 1, 1, 2),
+ row("B"),
+ row("D"));
+
+ assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 10; ", 1, 1, 2),
+ row("B"),
+ row("D"),
+ row("A"));
+
+ assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC; ", 1, 1, 2),
+ row("A"),
+ row("D"),
+ row("B"));
+
+ assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 2; ", 1, 1, 2),
+ row("A"),
+ row("D"));
+
+ assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 10; ", 1, 1, 2),
+ row("A"),
+ row("D"),
+ row("B"));
+
+ assertInvalidMessage("LIMIT must be strictly positive",
+ "SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 0; ", 1, 1, 2);
}
/**