You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2016/05/20 06:23:06 UTC
calcite git commit: [CALCITE-1235] Fully push down limit+offset in
Cassandra
Repository: calcite
Updated Branches:
refs/heads/master d58bf9150 -> 4c89dce30
[CALCITE-1235] Fully push down limit+offset in Cassandra
Close apache/calcite#229
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/4c89dce3
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/4c89dce3
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/4c89dce3
Branch: refs/heads/master
Commit: 4c89dce303f8760e10bde3ab2b387780ae059e31
Parents: d58bf91
Author: Michael Mior <mi...@gmail.com>
Authored: Fri May 20 09:08:54 2016 +0300
Committer: Michael Mior <mi...@gmail.com>
Committed: Fri May 20 09:19:43 2016 +0300
----------------------------------------------------------------------
.../adapter/cassandra/CassandraLimit.java | 71 ++++++++++++++++++++
.../adapter/cassandra/CassandraMethod.java | 2 +-
.../calcite/adapter/cassandra/CassandraRel.java | 7 +-
.../adapter/cassandra/CassandraRules.java | 39 +++++++++--
.../adapter/cassandra/CassandraSort.java | 20 ++----
.../adapter/cassandra/CassandraTable.java | 20 ++++--
.../CassandraToEnumerableConverter.java | 11 +--
.../apache/calcite/test/CassandraAdapterIT.java | 32 +++++++--
8 files changed, 161 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
new file mode 100644
index 0000000..cca7e19
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Implementation of limits in Cassandra.
+ */
+public class CassandraLimit extends SingleRel implements CassandraRel {
+ public final RexNode offset;
+ public final RexNode fetch;
+
+ public CassandraLimit(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, input);
+ this.offset = offset;
+ this.fetch = fetch;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ // We do this so we get the limit for free
+ return planner.getCostFactory().makeZeroCost();
+ }
+
+ @Override public CassandraLimit copy(RelTraitSet traitSet, List<RelNode> newInputs) {
+ return new CassandraLimit(getCluster(), traitSet, sole(newInputs), offset, fetch);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ if (offset != null) { implementor.offset = RexLiteral.intValue(offset); }
+ if (fetch != null) { implementor.fetch = RexLiteral.intValue(fetch); }
+ }
+
+ public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw);
+ pw.itemIf("offset", offset, offset != null);
+ pw.itemIf("fetch", fetch, fetch != null);
+ return pw;
+ }
+}
+
+// End CassandraLimit.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
index c7d0973..b2035e5 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
@@ -28,7 +28,7 @@ import java.util.List;
*/
public enum CassandraMethod {
CASSANDRA_QUERYABLE_QUERY(CassandraTable.CassandraQueryable.class, "query",
- List.class, List.class, List.class, List.class, String.class);
+ List.class, List.class, List.class, List.class, Integer.class, Integer.class);
public final Method method;
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
index 0191fd0..b74919d 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
@@ -39,7 +39,8 @@ public interface CassandraRel extends RelNode {
class Implementor {
final Map<String, String> selectFields = new LinkedHashMap<String, String>();
final List<String> whereClause = new ArrayList<String>();
- String limitValue = null;
+ int offset = 0;
+ int fetch = -1;
final List<String> order = new ArrayList<String>();
RelOptTable table;
@@ -63,10 +64,6 @@ public interface CassandraRel extends RelNode {
order.addAll(newOrder);
}
- public void setLimit(String limit) {
- limitValue = limit;
- }
-
public void visitChild(int ordinal, RelNode input) {
assert ordinal == 0;
((CassandraRel) input).implement(this);
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
index 0e0bcb2..d67532d 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.cassandra;
+import org.apache.calcite.adapter.enumerable.EnumerableLimit;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptRule;
@@ -59,7 +60,8 @@ public class CassandraRules {
public static final RelOptRule[] RULES = {
CassandraFilterRule.INSTANCE,
CassandraProjectRule.INSTANCE,
- CassandraSortRule.INSTANCE
+ CassandraSortRule.INSTANCE,
+ CassandraLimitRule.INSTANCE
};
static List<String> cassandraFieldNames(final RelDataType rowType) {
@@ -277,8 +279,8 @@ public class CassandraRules {
private static final Predicate<Sort> SORT_PREDICATE =
new Predicate<Sort>() {
public boolean apply(Sort input) {
- // CQL has no support for offsets
- return input.offset == null;
+ // Limits are handled by CassandraLimit
+ return input.offset == null && input.fetch == null;
}
};
private static final Predicate<CassandraFilter> FILTER_PREDICATE =
@@ -304,7 +306,7 @@ public class CassandraRules {
.replace(sort.getCollation());
return new CassandraSort(sort.getCluster(), traitSet,
convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
- sort.getCollation(), filter.getImplicitCollation(), sort.fetch);
+ sort.getCollation());
}
public boolean matches(RelOptRuleCall call) {
@@ -383,6 +385,35 @@ public class CassandraRules {
}
}
}
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.adapter.enumerable.EnumerableLimit} to a
+ * {@link CassandraLimit}.
+ */
+ private static class CassandraLimitRule extends RelOptRule {
+ private static final CassandraLimitRule INSTANCE = new CassandraLimitRule();
+
+ private CassandraLimitRule() {
+ super(operand(EnumerableLimit.class, operand(CassandraToEnumerableConverter.class, any())),
+ "CassandraLimitRule");
+ }
+
+ public RelNode convert(EnumerableLimit limit) {
+ final RelTraitSet traitSet =
+ limit.getTraitSet().replace(CassandraRel.CONVENTION);
+ return new CassandraLimit(limit.getCluster(), traitSet,
+ convert(limit.getInput(), CassandraRel.CONVENTION), limit.offset, limit.fetch);
+ }
+
+ /** @see org.apache.calcite.rel.convert.ConverterRule */
+ public void onMatch(RelOptRuleCall call) {
+ final EnumerableLimit limit = call.rel(0);
+ final RelNode converted = convert(limit);
+ if (converted != null) {
+ call.transformTo(converted);
+ }
+ }
+ }
}
// End CassandraRules.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
index 61d7b31..8487815 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
@@ -26,7 +26,6 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import java.util.ArrayList;
@@ -37,13 +36,9 @@ import java.util.List;
* relational expression in Cassandra.
*/
public class CassandraSort extends Sort implements CassandraRel {
- private final RelCollation implicitCollation;
-
public CassandraSort(RelOptCluster cluster, RelTraitSet traitSet,
- RelNode child, RelCollation collation, RelCollation implicitCollation, RexNode fetch) {
- super(cluster, traitSet, child, collation, null, fetch);
-
- this.implicitCollation = implicitCollation;
+ RelNode child, RelCollation collation) {
+ super(cluster, traitSet, child, collation, null, null);
assert getConvention() == CassandraRel.CONVENTION;
assert getConvention() == child.getConvention();
@@ -54,18 +49,14 @@ public class CassandraSort extends Sort implements CassandraRel {
RelOptCost cost = super.computeSelfCost(planner, mq);
if (!collation.getFieldCollations().isEmpty()) {
return cost.multiplyBy(0.05);
- } else if (fetch == null) {
- return cost;
} else {
- // We do this so we get the limit for free
- return planner.getCostFactory().makeZeroCost();
+ return cost;
}
}
@Override public Sort copy(RelTraitSet traitSet, RelNode input,
RelCollation newCollation, RexNode offset, RexNode fetch) {
- return new CassandraSort(getCluster(), traitSet, input, collation, implicitCollation,
- fetch);
+ return new CassandraSort(getCluster(), traitSet, input, collation);
}
public void implement(Implementor implementor) {
@@ -88,9 +79,6 @@ public class CassandraSort extends Sort implements CassandraRel {
implementor.addOrder(fieldOrder);
}
- if (fetch != null) {
- implementor.setLimit(((RexLiteral) fetch).getValue().toString());
- }
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index 1ff5502..ec2a636 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -99,7 +99,7 @@ public class CassandraTable extends AbstractQueryableTable
public Enumerable<Object> query(final Session session) {
return query(session, Collections.<Map.Entry<String, Class>>emptyList(),
Collections.<Map.Entry<String, String>>emptyList(),
- Collections.<String>emptyList(), Collections.<String>emptyList(), null);
+ Collections.<String>emptyList(), Collections.<String>emptyList(), 0, -1);
}
/** Executes a CQL query on the underlying table.
@@ -111,7 +111,7 @@ public class CassandraTable extends AbstractQueryableTable
*/
public Enumerable<Object> query(final Session session, List<Map.Entry<String, Class>> fields,
final List<Map.Entry<String, String>> selectFields, List<String> predicates,
- List<String> order, String limit) {
+ List<String> order, final Integer offset, final Integer fetch) {
// Build the type of the resulting row based on the provided fields
final RelDataTypeFactory typeFactory =
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -181,7 +181,10 @@ public class CassandraTable extends AbstractQueryableTable
if (!order.isEmpty()) {
queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
}
- if (limit != null) {
+
+ int limit = offset;
+ if (fetch >= 0) { limit += fetch; }
+ if (limit > 0) {
queryBuilder.append(" LIMIT " + limit);
}
queryBuilder.append(" ALLOW FILTERING");
@@ -190,7 +193,12 @@ public class CassandraTable extends AbstractQueryableTable
return new AbstractEnumerable<Object>() {
public Enumerator<Object> enumerator() {
final ResultSet results = session.execute(query);
- return new CassandraEnumerator(results, resultRowType);
+ // Skip results until we get to the right offset
+ int skip = 0;
+ Enumerator<Object> enumerator = new CassandraEnumerator(results, resultRowType);
+ while (skip < offset && enumerator.moveNext()) { skip++; }
+
+ return enumerator;
}
};
}
@@ -238,9 +246,9 @@ public class CassandraTable extends AbstractQueryableTable
@SuppressWarnings("UnusedDeclaration")
public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
List<Map.Entry<String, String>> selectFields, List<String> predicates,
- List<String> order, String limit) {
+ List<String> order, Integer offset, Integer fetch) {
return getTable().query(getSession(), fields, selectFields, predicates,
- order, limit);
+ order, offset, fetch);
}
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
index 31bfdcc..66db1ff 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
@@ -112,14 +112,17 @@ public class CassandraToEnumerableConverter
final Expression order =
list.append("order",
constantArrayList(cassandraImplementor.order, String.class));
- final Expression limit =
- list.append("limit",
- Expressions.constant(cassandraImplementor.limitValue));
+ final Expression offset =
+ list.append("offset",
+ Expressions.constant(cassandraImplementor.offset));
+ final Expression fetch =
+ list.append("fetch",
+ Expressions.constant(cassandraImplementor.fetch));
Expression enumerable =
list.append("enumerable",
Expressions.call(table,
CassandraMethod.CASSANDRA_QUERYABLE_QUERY.method, fields,
- selectFields, predicates, order, limit));
+ selectFields, predicates, order, offset, fetch));
if (CalcitePrepareImpl.DEBUG) {
System.out.println("Cassandra: " + predicates);
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
index a2c132e..f1e87ff 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -103,11 +103,12 @@ public class CassandraAdapterIT {
CalciteAssert.that()
.enable(enabled())
.with(TWISSANDRA)
- .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 1")
- .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+ .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 2")
+ .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n"
+ + "tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n")
.explainContains("PLAN=CassandraToEnumerableConverter\n"
- + " CassandraProject(tweet_id=[$2])\n"
- + " CassandraSort(fetch=[1])\n"
+ + " CassandraLimit(fetch=[2])\n"
+ + " CassandraProject(tweet_id=[$2])\n"
+ " CassandraFilter(condition=[=(CAST($0):VARCHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
}
@@ -133,7 +134,28 @@ public class CassandraAdapterIT {
.enable(enabled())
.with(TWISSANDRA)
.query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 8")
- .explainContains("CassandraSort(fetch=[8])\n");
+ .explainContains("CassandraLimit(fetch=[8])\n");
+ }
+
+ @Test public void testSortLimit() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select * from \"userline\" where \"username\"='!PUBLIC!' "
+ + "order by \"time\" desc limit 10")
+ .explainContains(" CassandraLimit(fetch=[10])\n"
+ + " CassandraSort(sort0=[$1], dir0=[DESC])");
+ }
+
+ @Test public void testSortOffset() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select \"tweet_id\" from \"userline\" where "
+ + "\"username\"='!PUBLIC!' limit 2 offset 1")
+ .explainContains("CassandraLimit(offset=[1], fetch=[2])")
+ .returns("tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n"
+ + "tweet_id=f3e4182e-d05b-11e5-b58b-90e2ba530b12\n");
}
@Test public void testMaterializedView() {