You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/04/11 12:43:22 UTC

[1/2] calcite git commit: [CALCITE-1725] Push project aggregate of time extract to druid

Repository: calcite
Updated Branches:
  refs/heads/master 35cd2abc8 -> b48f634fe


[CALCITE-1725] Push project aggregate of time extract to druid

* Fix 'testPushAggregateOnTime' test.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e76d5ee3
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e76d5ee3
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e76d5ee3

Branch: refs/heads/master
Commit: e76d5ee33a3cda0d9ce495e21681f5bede25f129
Parents: 35cd2ab
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Apr 11 13:41:19 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Apr 11 13:41:19 2017 +0100

----------------------------------------------------------------------
 .../adapter/druid/DruidConnectionImpl.java      | 21 ++++++++++++++------
 .../calcite/adapter/druid/DruidQuery.java       |  5 ++---
 .../org/apache/calcite/test/DruidAdapterIT.java | 16 +++++++--------
 3 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index c736098..a1975eb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -270,7 +270,20 @@ class DruidConnectionImpl implements DruidConnection {
 
     // Move to next token, which is name's value
     JsonToken token = parser.nextToken();
-    if (fieldName.equals(DEFAULT_RESPONSE_TIMESTAMP_COLUMN)) {
+
+    boolean isTimestampColumn = fieldName.equals(DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
+    int i = fieldNames.indexOf(fieldName);
+    ColumnMetaData.Rep type = null;
+    if (i < 0) {
+      if (!isTimestampColumn) {
+        // Field not present
+        return;
+      }
+    } else {
+      type = fieldTypes.get(i);
+    }
+
+    if (isTimestampColumn || ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP.equals(type)) {
       try {
         final Date parse;
         // synchronized block to avoid race condition
@@ -285,11 +298,7 @@ class DruidConnectionImpl implements DruidConnection {
       }
       return;
     }
-    int i = fieldNames.indexOf(fieldName);
-    if (i < 0) {
-      return;
-    }
-    ColumnMetaData.Rep type = fieldTypes.get(i);
+
     switch (token) {
     case VALUE_NUMBER_INT:
       if (type == null) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index a6c1fa9..1b81f67 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -1081,10 +1081,9 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     }
 
     private ColumnMetaData.Rep getPrimitive(RelDataTypeField field) {
-      if (field.getName().equals(query.druidTable.timestampFieldName)) {
-        return ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP;
-      }
       switch (field.getType().getSqlTypeName()) {
+      case TIMESTAMP:
+        return ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP;
       case BIGINT:
         return ColumnMetaData.Rep.LONG;
       case INTEGER:

http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index db6f8f8..904ecab 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1506,18 +1506,18 @@ public class DruidAdapterIT {
   }
 
   @Test public void testPushAggregateOnTime() {
-    String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" where "
-        + "\"product_id\" = 1016 and "
-        + "\"timestamp\" < cast('1997-01-03' as timestamp) and \"timestamp\" > cast"
-        + "('1990-01-01' as timestamp)" + " group by"
-        + "\"timestamp\", \"product_id\" ";
+    String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" "
+        + "where \"product_id\" = 1016 "
+        + "and \"timestamp\" < cast('1997-01-03' as timestamp) "
+        + "and \"timestamp\" > cast('1990-01-01' as timestamp) "
+        + "group by \"timestamp\", \"product_id\" ";
     String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
         + "'granularity':'all','dimensions':[{'type':'extraction',"
-        + "'dimension':'__time','outputName':'timestamp',"
+        + "'dimension':'__time','outputName':'extract_0',"
         + "'extractionFn':{'type':'timeFormat','format':'yyyy-MM-dd";
     sql(sql)
-        .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00");
   }
 
   @Test public void testPushAggregateOnTimeWithExtractYear() {


[2/2] calcite git commit: [CALCITE-1749] Push filter conditions partially into Druid

Posted by jc...@apache.org.
[CALCITE-1749] Push filter conditions partially into Druid


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/b48f634f
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/b48f634f
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/b48f634f

Branch: refs/heads/master
Commit: b48f634fe0c90f68c7c047a387814557e5701fa7
Parents: e76d5ee
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Apr 11 13:43:01 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Apr 11 13:43:01 2017 +0100

----------------------------------------------------------------------
 druid/pom.xml                                   |  22 ++--
 .../calcite/adapter/druid/DruidRules.java       | 102 +++++++++++++------
 .../org/apache/calcite/test/DruidAdapterIT.java |  28 +++++
 3 files changed, 111 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/pom.xml
----------------------------------------------------------------------
diff --git a/druid/pom.xml b/druid/pom.xml
index 7e433cd..362da10 100644
--- a/druid/pom.xml
+++ b/druid/pom.xml
@@ -57,6 +57,10 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
@@ -86,15 +90,15 @@ limitations under the License.
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
-      <dependency>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-      </dependency>
-      <dependency>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-          <scope>test</scope>
-      </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>

http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 8f088bb..36e5b40 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -51,11 +51,15 @@ import org.apache.calcite.runtime.PredicateImpl;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Triple;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -158,11 +162,27 @@ public class DruidRules {
       final Filter filter = call.rel(0);
       final DruidQuery query = call.rel(1);
       final RelOptCluster cluster = filter.getCluster();
+      final RelBuilder relBuilder = call.builder();
       final RexBuilder rexBuilder = cluster.getRexBuilder();
-      if (!DruidQuery.isValidSignature(query.signature() + 'f')
-              || !query.isValidFilter(filter.getCondition())) {
+
+      if (!DruidQuery.isValidSignature(query.signature() + 'f')) {
         return;
       }
+
+      final List<RexNode> validPreds = new ArrayList<>();
+      final List<RexNode> nonValidPreds = new ArrayList<>();
+      final RexExecutor executor =
+          Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
+      final RexSimplify simplify = new RexSimplify(rexBuilder, true, executor);
+      final RexNode cond = simplify.simplify(filter.getCondition());
+      for (RexNode e : RelOptUtil.conjunctions(cond)) {
+        if (query.isValidFilter(e)) {
+          validPreds.add(e);
+        } else {
+          nonValidPreds.add(e);
+        }
+      }
+
       // Timestamp
       int timestampFieldIdx = -1;
       for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
@@ -172,71 +192,89 @@ public class DruidRules {
           break;
         }
       }
-      final RexExecutor executor =
-          Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
-      final RexSimplify simplify = new RexSimplify(rexBuilder, true, executor);
-      final Pair<List<RexNode>, List<RexNode>> pair =
-          splitFilters(rexBuilder, query,
-              simplify.simplify(filter.getCondition()), timestampFieldIdx);
-      if (pair == null) {
+
+      final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple =
+          splitFilters(rexBuilder, query, validPreds, nonValidPreds, timestampFieldIdx);
+      if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) {
         // We can't push anything useful to Druid.
         return;
       }
+      final List<RexNode> residualPreds = new ArrayList<>(triple.getRight());
       List<LocalInterval> intervals = null;
-      if (!pair.left.isEmpty()) {
+      if (!triple.getLeft().isEmpty()) {
         intervals = DruidDateTimeUtils.createInterval(
             query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
-            RexUtil.composeConjunction(rexBuilder, pair.left, false));
+            RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false));
         if (intervals == null) {
           // We can't push anything useful to Druid.
-          return;
+          residualPreds.addAll(triple.getLeft());
         }
       }
-      DruidQuery newDruidQuery = query;
-      if (!pair.right.isEmpty()) {
+      if (intervals == null && triple.getMiddle().isEmpty()) {
+        // We can't push anything useful to Druid.
+        return;
+      }
+      RelNode newDruidQuery = query;
+      if (!triple.getMiddle().isEmpty()) {
         final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
-            RexUtil.composeConjunction(rexBuilder, pair.right, false));
+            RexUtil.composeConjunction(rexBuilder, triple.getMiddle(), false));
         newDruidQuery = DruidQuery.extendQuery(query, newFilter);
       }
       if (intervals != null) {
-        newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals);
+        newDruidQuery = DruidQuery.extendQuery((DruidQuery) newDruidQuery, intervals);
+      }
+      if (!residualPreds.isEmpty()) {
+        newDruidQuery = relBuilder
+            .push(newDruidQuery)
+            .filter(residualPreds)
+            .build();
       }
       call.transformTo(newDruidQuery);
     }
 
-    /** Splits the filter condition in two groups: those that filter on the timestamp column
-     * and those that filter on other fields. */
-    private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder,
-        final DruidQuery input, RexNode cond, final int timestampFieldIdx) {
+    /**
+     * Given a list of conditions that contain Druid valid operations and
+     * a list that contains those that contain any non-supported operation,
+     * it outputs a triple with three different categories:
+     * 1-l) condition filters on the timestamp column,
+     * 2-m) condition filters that can be pushed to Druid,
+     * 3-r) condition filters that cannot be pushed to Druid.
+     */
+    private static Triple<List<RexNode>, List<RexNode>, List<RexNode>> splitFilters(
+        final RexBuilder rexBuilder, final DruidQuery input, final List<RexNode> validPreds,
+        final List<RexNode> nonValidPreds, final int timestampFieldIdx) {
       final List<RexNode> timeRangeNodes = new ArrayList<>();
-      final List<RexNode> otherNodes = new ArrayList<>();
-      List<RexNode> conjs = RelOptUtil.conjunctions(cond);
-      if (conjs.isEmpty()) {
-        // We do not transform
-        return null;
-      }
+      final List<RexNode> pushableNodes = new ArrayList<>();
+      final List<RexNode> nonPushableNodes = new ArrayList<>(nonValidPreds);
       // Number of columns with the dimensions and timestamp
-      for (RexNode conj : conjs) {
+      for (RexNode conj : validPreds) {
         final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
         conj.accept(visitor);
         if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
           if (visitor.inputPosReferenced.size() != 1) {
             // Complex predicate, transformation currently not supported
-            return null;
+            nonPushableNodes.add(conj);
+          } else {
+            timeRangeNodes.add(conj);
           }
-          timeRangeNodes.add(conj);
         } else {
+          boolean filterOnMetrics = false;
           for (Integer i : visitor.inputPosReferenced) {
             if (input.druidTable.metricFieldNames.contains(
                     input.getRowType().getFieldList().get(i).getName())) {
               // Filter on metrics, not supported in Druid
-              return null;
+              filterOnMetrics = true;
+              break;
             }
           }
-          otherNodes.add(conj);
+          if (filterOnMetrics) {
+            nonPushableNodes.add(conj);
+          } else {
+            pushableNodes.add(conj);
+          }
         }
       }
-      return Pair.of(timeRangeNodes, otherNodes);
+      return ImmutableTriple.of(timeRangeNodes, pushableNodes, nonPushableNodes);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 904ecab..60952a7 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1689,6 +1689,34 @@ public class DruidAdapterIT {
         .returnsUnordered("EXPR$0=02; dayOfMonth=1016", "EXPR$0=10; dayOfMonth=1016",
             "EXPR$0=13; dayOfMonth=1016", "EXPR$0=16; dayOfMonth=1016");
   }
+
+  @Test public void testPushComplexFilter() {
+    String sql = "select sum(\"store_sales\") from \"foodmart\" "
+        + "where EXTRACT( year from \"timestamp\") = 1997 and "
+        + "\"cases_per_pallet\" >= 8 and \"cases_per_pallet\" <= 10 and "
+        + "\"units_per_case\" < 15 ";
+    String druidQuery = "{'queryType':'select','dataSource':'foodmart','descending':false,"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+        + "'filter':{'type':'and',"
+        + "'fields':[{'type':'bound','dimension':'cases_per_pallet','lower':'8',"
+        + "'lowerStrict':false,'ordering':'numeric'},"
+        + "{'type':'bound','dimension':'cases_per_pallet','upper':'10','upperStrict':false,"
+        + "'ordering':'numeric'},{'type':'bound','dimension':'units_per_case','upper':'15',"
+        + "'upperStrict':true,'ordering':'numeric'}]},'dimensions':[],'metrics':['store_sales'],"
+        + "'granularity':'all','pagingSpec':{'threshold':16384,'fromNext':true},"
+        + "'context':{'druid.query.fetch':false}}";
+    sql(sql)
+        .queryContains(druidChecker(druidQuery))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+             + "  BindableAggregate(group=[{}], EXPR$0=[SUM($1)])\n"
+             + "    BindableFilter(condition=[AND(>=(/INT(Reinterpret($0), 86400000), 1997-01-01), "
+             + "<(/INT(Reinterpret($0), 86400000), 1998-01-01))])\n"
+             + "      DruidQuery(table=[[foodmart, foodmart]], "
+             + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
+             + "filter=[AND(>=(CAST($11):BIGINT, 8), <=(CAST($11):BIGINT, 10), "
+             + "<(CAST($10):BIGINT, 15))], projects=[[$0, $90]])\n")
+        .returnsUnordered("EXPR$0=75364.09998679161");
+  }
 }
 
 // End DruidAdapterIT.java