You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2016/05/20 06:23:06 UTC

calcite git commit: [CALCITE-1235] Fully push down limit+offset in Cassandra

Repository: calcite
Updated Branches:
  refs/heads/master d58bf9150 -> 4c89dce30


[CALCITE-1235] Fully push down limit+offset in Cassandra

Close apache/calcite#229


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/4c89dce3
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/4c89dce3
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/4c89dce3

Branch: refs/heads/master
Commit: 4c89dce303f8760e10bde3ab2b387780ae059e31
Parents: d58bf91
Author: Michael Mior <mi...@gmail.com>
Authored: Fri May 20 09:08:54 2016 +0300
Committer: Michael Mior <mi...@gmail.com>
Committed: Fri May 20 09:19:43 2016 +0300

----------------------------------------------------------------------
 .../adapter/cassandra/CassandraLimit.java       | 71 ++++++++++++++++++++
 .../adapter/cassandra/CassandraMethod.java      |  2 +-
 .../calcite/adapter/cassandra/CassandraRel.java |  7 +-
 .../adapter/cassandra/CassandraRules.java       | 39 +++++++++--
 .../adapter/cassandra/CassandraSort.java        | 20 ++----
 .../adapter/cassandra/CassandraTable.java       | 20 ++++--
 .../CassandraToEnumerableConverter.java         | 11 +--
 .../apache/calcite/test/CassandraAdapterIT.java | 32 +++++++--
 8 files changed, 161 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
new file mode 100644
index 0000000..cca7e19
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Implementation of limits in Cassandra.
+ */
+public class CassandraLimit extends SingleRel implements CassandraRel {
+  public final RexNode offset;
+  public final RexNode fetch;
+
+  public CassandraLimit(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, input);
+    this.offset = offset;
+    this.fetch = fetch;
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    // We do this so we get the limit for free
+    return planner.getCostFactory().makeZeroCost();
+  }
+
+  @Override public CassandraLimit copy(RelTraitSet traitSet, List<RelNode> newInputs) {
+    return new CassandraLimit(getCluster(), traitSet, sole(newInputs), offset, fetch);
+  }
+
+  public void implement(Implementor implementor) {
+    implementor.visitChild(0, getInput());
+    if (offset != null) { implementor.offset = RexLiteral.intValue(offset); }
+    if (fetch != null) { implementor.fetch = RexLiteral.intValue(fetch); }
+  }
+
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+    pw.itemIf("offset", offset, offset != null);
+    pw.itemIf("fetch", fetch, fetch != null);
+    return pw;
+  }
+}
+
+// End CassandraLimit.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
index c7d0973..b2035e5 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
@@ -28,7 +28,7 @@ import java.util.List;
  */
 public enum CassandraMethod {
   CASSANDRA_QUERYABLE_QUERY(CassandraTable.CassandraQueryable.class, "query",
-      List.class, List.class, List.class, List.class, String.class);
+      List.class, List.class, List.class, List.class, Integer.class, Integer.class);
 
   public final Method method;
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
index 0191fd0..b74919d 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
@@ -39,7 +39,8 @@ public interface CassandraRel extends RelNode {
   class Implementor {
     final Map<String, String> selectFields = new LinkedHashMap<String, String>();
     final List<String> whereClause = new ArrayList<String>();
-    String limitValue = null;
+    int offset = 0;
+    int fetch = -1;
     final List<String> order = new ArrayList<String>();
 
     RelOptTable table;
@@ -63,10 +64,6 @@ public interface CassandraRel extends RelNode {
       order.addAll(newOrder);
     }
 
-    public void setLimit(String limit) {
-      limitValue = limit;
-    }
-
     public void visitChild(int ordinal, RelNode input) {
       assert ordinal == 0;
       ((CassandraRel) input).implement(this);

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
index 0e0bcb2..d67532d 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.cassandra;
 
+import org.apache.calcite.adapter.enumerable.EnumerableLimit;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRule;
@@ -59,7 +60,8 @@ public class CassandraRules {
   public static final RelOptRule[] RULES = {
     CassandraFilterRule.INSTANCE,
     CassandraProjectRule.INSTANCE,
-    CassandraSortRule.INSTANCE
+    CassandraSortRule.INSTANCE,
+    CassandraLimitRule.INSTANCE
   };
 
   static List<String> cassandraFieldNames(final RelDataType rowType) {
@@ -277,8 +279,8 @@ public class CassandraRules {
     private static final Predicate<Sort> SORT_PREDICATE =
         new Predicate<Sort>() {
           public boolean apply(Sort input) {
-            // CQL has no support for offsets
-            return input.offset == null;
+            // Limits are handled by CassandraLimit
+            return input.offset == null && input.fetch == null;
           }
         };
     private static final Predicate<CassandraFilter> FILTER_PREDICATE =
@@ -304,7 +306,7 @@ public class CassandraRules {
               .replace(sort.getCollation());
       return new CassandraSort(sort.getCluster(), traitSet,
           convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
-          sort.getCollation(), filter.getImplicitCollation(), sort.fetch);
+          sort.getCollation());
     }
 
     public boolean matches(RelOptRuleCall call) {
@@ -383,6 +385,35 @@ public class CassandraRules {
       }
     }
   }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.adapter.enumerable.EnumerableLimit} to a
+   * {@link CassandraLimit}.
+   */
+  private static class CassandraLimitRule extends RelOptRule {
+    private static final CassandraLimitRule INSTANCE = new CassandraLimitRule();
+
+    private CassandraLimitRule() {
+      super(operand(EnumerableLimit.class, operand(CassandraToEnumerableConverter.class, any())),
+        "CassandraLimitRule");
+    }
+
+    public RelNode convert(EnumerableLimit limit) {
+      final RelTraitSet traitSet =
+          limit.getTraitSet().replace(CassandraRel.CONVENTION);
+      return new CassandraLimit(limit.getCluster(), traitSet,
+        convert(limit.getInput(), CassandraRel.CONVENTION), limit.offset, limit.fetch);
+    }
+
+    /** @see org.apache.calcite.rel.convert.ConverterRule */
+    public void onMatch(RelOptRuleCall call) {
+      final EnumerableLimit limit = call.rel(0);
+      final RelNode converted = convert(limit);
+      if (converted != null) {
+        call.transformTo(converted);
+      }
+    }
+  }
 }
 
 // End CassandraRules.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
index 61d7b31..8487815 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
@@ -26,7 +26,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 
 import java.util.ArrayList;
@@ -37,13 +36,9 @@ import java.util.List;
  * relational expression in Cassandra.
  */
 public class CassandraSort extends Sort implements CassandraRel {
-  private final RelCollation implicitCollation;
-
   public CassandraSort(RelOptCluster cluster, RelTraitSet traitSet,
-      RelNode child, RelCollation collation, RelCollation implicitCollation, RexNode fetch) {
-    super(cluster, traitSet, child, collation, null, fetch);
-
-    this.implicitCollation = implicitCollation;
+      RelNode child, RelCollation collation) {
+    super(cluster, traitSet, child, collation, null, null);
 
     assert getConvention() == CassandraRel.CONVENTION;
     assert getConvention() == child.getConvention();
@@ -54,18 +49,14 @@ public class CassandraSort extends Sort implements CassandraRel {
     RelOptCost cost = super.computeSelfCost(planner, mq);
     if (!collation.getFieldCollations().isEmpty()) {
       return cost.multiplyBy(0.05);
-    } else if (fetch == null) {
-      return cost;
     } else {
-      // We do this so we get the limit for free
-      return planner.getCostFactory().makeZeroCost();
+      return cost;
     }
   }
 
   @Override public Sort copy(RelTraitSet traitSet, RelNode input,
       RelCollation newCollation, RexNode offset, RexNode fetch) {
-    return new CassandraSort(getCluster(), traitSet, input, collation, implicitCollation,
-        fetch);
+    return new CassandraSort(getCluster(), traitSet, input, collation);
   }
 
   public void implement(Implementor implementor) {
@@ -88,9 +79,6 @@ public class CassandraSort extends Sort implements CassandraRel {
 
       implementor.addOrder(fieldOrder);
     }
-    if (fetch != null) {
-      implementor.setLimit(((RexLiteral) fetch).getValue().toString());
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index 1ff5502..ec2a636 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -99,7 +99,7 @@ public class CassandraTable extends AbstractQueryableTable
   public Enumerable<Object> query(final Session session) {
     return query(session, Collections.<Map.Entry<String, Class>>emptyList(),
         Collections.<Map.Entry<String, String>>emptyList(),
-        Collections.<String>emptyList(), Collections.<String>emptyList(), null);
+        Collections.<String>emptyList(), Collections.<String>emptyList(), 0, -1);
   }
 
   /** Executes a CQL query on the underlying table.
@@ -111,7 +111,7 @@ public class CassandraTable extends AbstractQueryableTable
    */
   public Enumerable<Object> query(final Session session, List<Map.Entry<String, Class>> fields,
         final List<Map.Entry<String, String>> selectFields, List<String> predicates,
-        List<String> order, String limit) {
+        List<String> order, final Integer offset, final Integer fetch) {
     // Build the type of the resulting row based on the provided fields
     final RelDataTypeFactory typeFactory =
         new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -181,7 +181,10 @@ public class CassandraTable extends AbstractQueryableTable
     if (!order.isEmpty()) {
       queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
     }
-    if (limit != null) {
+
+    int limit = offset;
+    if (fetch >= 0) { limit += fetch; }
+    if (limit > 0) {
       queryBuilder.append(" LIMIT " + limit);
     }
     queryBuilder.append(" ALLOW FILTERING");
@@ -190,7 +193,12 @@ public class CassandraTable extends AbstractQueryableTable
     return new AbstractEnumerable<Object>() {
       public Enumerator<Object> enumerator() {
         final ResultSet results = session.execute(query);
-        return new CassandraEnumerator(results, resultRowType);
+        // Skip results until we get to the right offset
+        int skip = 0;
+        Enumerator<Object> enumerator = new CassandraEnumerator(results, resultRowType);
+        while (skip < offset && enumerator.moveNext()) { skip++; }
+
+        return enumerator;
       }
     };
   }
@@ -238,9 +246,9 @@ public class CassandraTable extends AbstractQueryableTable
     @SuppressWarnings("UnusedDeclaration")
     public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
         List<Map.Entry<String, String>> selectFields, List<String> predicates,
-        List<String> order, String limit) {
+        List<String> order, Integer offset, Integer fetch) {
       return getTable().query(getSession(), fields, selectFields, predicates,
-          order, limit);
+          order, offset, fetch);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
index 31bfdcc..66db1ff 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
@@ -112,14 +112,17 @@ public class CassandraToEnumerableConverter
     final Expression order =
         list.append("order",
             constantArrayList(cassandraImplementor.order, String.class));
-    final Expression limit =
-        list.append("limit",
-            Expressions.constant(cassandraImplementor.limitValue));
+    final Expression offset =
+        list.append("offset",
+            Expressions.constant(cassandraImplementor.offset));
+    final Expression fetch =
+        list.append("fetch",
+            Expressions.constant(cassandraImplementor.fetch));
     Expression enumerable =
         list.append("enumerable",
             Expressions.call(table,
                 CassandraMethod.CASSANDRA_QUERYABLE_QUERY.method, fields,
-                selectFields, predicates, order, limit));
+                selectFields, predicates, order, offset, fetch));
     if (CalcitePrepareImpl.DEBUG) {
       System.out.println("Cassandra: " + predicates);
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
index a2c132e..f1e87ff 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -103,11 +103,12 @@ public class CassandraAdapterIT {
     CalciteAssert.that()
         .enable(enabled())
         .with(TWISSANDRA)
-        .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 1")
-        .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+        .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 2")
+        .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n"
+               + "tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n")
         .explainContains("PLAN=CassandraToEnumerableConverter\n"
-                + "  CassandraProject(tweet_id=[$2])\n"
-                + "    CassandraSort(fetch=[1])\n"
+                + "  CassandraLimit(fetch=[2])\n"
+                + "    CassandraProject(tweet_id=[$2])\n"
                 + "      CassandraFilter(condition=[=(CAST($0):VARCHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
   }
 
@@ -133,7 +134,28 @@ public class CassandraAdapterIT {
         .enable(enabled())
         .with(TWISSANDRA)
         .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 8")
-        .explainContains("CassandraSort(fetch=[8])\n");
+        .explainContains("CassandraLimit(fetch=[8])\n");
+  }
+
+  @Test public void testSortLimit() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select * from \"userline\" where \"username\"='!PUBLIC!' "
+             + "order by \"time\" desc limit 10")
+        .explainContains("  CassandraLimit(fetch=[10])\n"
+                       + "    CassandraSort(sort0=[$1], dir0=[DESC])");
+  }
+
+  @Test public void testSortOffset() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select \"tweet_id\" from \"userline\" where "
+             + "\"username\"='!PUBLIC!' limit 2 offset 1")
+        .explainContains("CassandraLimit(offset=[1], fetch=[2])")
+        .returns("tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n"
+               + "tweet_id=f3e4182e-d05b-11e5-b58b-90e2ba530b12\n");
   }
 
   @Test public void testMaterializedView() {