You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/04/10 11:34:24 UTC

[1/2] calcite git commit: [CALCITE-1725] Push project aggregate of time extract to druid (Slim Bouguerra)

Repository: calcite
Updated Branches:
  refs/heads/master bff34c1a3 -> 04c0c8b6f


[CALCITE-1725] Push project aggregate of time extract to druid (Slim Bouguerra)

Close apache/calcite#412


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ef03b6e4
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ef03b6e4
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ef03b6e4

Branch: refs/heads/master
Commit: ef03b6e4c6dbae800e7c1aaa1deb5bbd7960e911
Parents: bff34c1
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Apr 6 12:32:27 2017 +0200
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon Apr 10 09:56:20 2017 +0100

----------------------------------------------------------------------
 .../adapter/druid/DefaultDimensionSpec.java     |  42 ++++
 .../calcite/adapter/druid/DimensionSpec.java    |  25 +++
 .../adapter/druid/DruidConnectionImpl.java      |   4 +-
 .../adapter/druid/DruidDateTimeUtils.java       |  10 +-
 .../calcite/adapter/druid/DruidQuery.java       | 101 +++++----
 .../calcite/adapter/druid/DruidRules.java       |  46 +++--
 .../adapter/druid/ExtractionDimensionSpec.java  |  53 +++++
 .../adapter/druid/ExtractionFunction.java       |  25 +++
 .../druid/TimeExtractionDimensionSpec.java      |  60 ++++++
 .../adapter/druid/TimeExtractionFunction.java   |  76 +++++++
 .../org/apache/calcite/test/DruidAdapterIT.java | 204 +++++++++++++++++--
 11 files changed, 566 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
new file mode 100644
index 0000000..1a92dec
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+
+/**
+ * Default implementation of Dimension spec
+ */
+public class DefaultDimensionSpec implements DimensionSpec {
+
+  private final String dimension;
+
+  public DefaultDimensionSpec(String dimension) {
+    this.dimension = dimension;
+  }
+
+  @Override public void write(JsonGenerator generator) throws IOException {
+    generator.writeStartObject();
+    generator.writeStringField("type", "default");
+    generator.writeStringField("dimension", dimension);
+    generator.writeEndObject();
+  }
+}
+
+// End DefaultDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
new file mode 100644
index 0000000..45e5936
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+/**
+ * Druid dimension spec interface
+ */
+public interface DimensionSpec extends DruidQuery.Json {
+}
+
+// End DimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index b9b860a..c736098 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -68,7 +68,7 @@ class DruidConnectionImpl implements DruidConnection {
   private final String url;
   private final String coordinatorUrl;
 
-  private static final String DEFAULT_RESPONSE_TIMESTAMP_COLUMN = "timestamp";
+  public static final String DEFAULT_RESPONSE_TIMESTAMP_COLUMN = "timestamp";
   private static final SimpleDateFormat UTC_TIMESTAMP_FORMAT;
 
   static {
@@ -239,7 +239,7 @@ class DruidConnectionImpl implements DruidConnection {
               if (posTimestampField != -1) {
                 rowBuilder.set(posTimestampField, timeValue);
               }
-              parseFields(fieldNames, fieldTypes, rowBuilder, parser);
+              parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
               sink.send(rowBuilder.build());
               rowBuilder.reset();
             }

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 709f8b3..39a1538 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -258,11 +258,15 @@ public class DruidDateTimeUtils {
    * @return the granularity, or null if it cannot be inferred
    */
   public static Granularity extractGranularity(RexCall call) {
-    if (call.getKind() != SqlKind.FLOOR
-        || call.getOperands().size() != 2) {
+    if ((call.getKind() != SqlKind.FLOOR && call.getKind() != SqlKind.EXTRACT)
+            || call.getOperands().size() != 2) {
       return null;
     }
-    final RexLiteral flag = (RexLiteral) call.operands.get(1);
+    int flagIndex = 1;
+    if (call.getKind() == SqlKind.EXTRACT) {
+      flagIndex = 0;
+    }
+    final RexLiteral flag = (RexLiteral) call.operands.get(flagIndex);
     final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
     if (timeUnit == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index f8211a4..2661a06 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -32,6 +32,7 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -78,6 +79,8 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import static org.apache.calcite.sql.SqlKind.INPUT_REF;
+
 /**
  * Relational expression representing a scan of a Druid data set.
  */
@@ -232,12 +235,12 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
   private boolean isValidCast(RexCall e, boolean boundedComparator) {
     assert e.isA(SqlKind.CAST);
-    if (e.getOperands().get(0).isA(SqlKind.INPUT_REF)
+    if (e.getOperands().get(0).isA(INPUT_REF)
         && e.getType().getFamily() == SqlTypeFamily.CHARACTER) {
       // CAST of input to character type
       return true;
     }
-    if (e.getOperands().get(0).isA(SqlKind.INPUT_REF)
+    if (e.getOperands().get(0).isA(INPUT_REF)
         && e.getType().getFamily() == SqlTypeFamily.NUMERIC
         && boundedComparator) {
       // CAST of input to numeric type, it is part of a bounded comparison
@@ -492,11 +495,12 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     // operators is more complex, since we need to extract
     // the conditions to know whether the query will be
     // executed as a Timeseries, TopN, or GroupBy in Druid
-    final List<String> dimensions = new ArrayList<>();
+    final List<DimensionSpec> dimensions = new ArrayList<>();
     final List<JsonAggregation> aggregations = new ArrayList<>();
     Granularity granularity = Granularity.ALL;
     Direction timeSeriesDirection = null;
     JsonLimit limit = null;
+    TimeExtractionDimensionSpec timeExtractionDimensionSpec = null;
     if (groupSet != null) {
       assert aggCalls != null;
       assert aggNames != null;
@@ -514,12 +518,14 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
             final String origin = druidTable.getRowType(getCluster().getTypeFactory())
                 .getFieldList().get(ref.getIndex()).getName();
             if (origin.equals(druidTable.timestampFieldName)) {
-              granularity = Granularity.NONE;
-              builder.add(s);
+              granularity = Granularity.ALL;
+              timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract();
+              dimensions.add(timeExtractionDimensionSpec);
+              builder.add(DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
               assert timePositionIdx == -1;
               timePositionIdx = groupKey;
             } else {
-              dimensions.add(s);
+              dimensions.add(new DefaultDimensionSpec(s));
               builder.add(s);
             }
           } else if (project instanceof RexCall) {
@@ -529,11 +535,21 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
                 DruidDateTimeUtils.extractGranularity(call);
             if (funcGranularity != null) {
               granularity = funcGranularity;
-              builder.add(s);
-              assert timePositionIdx == -1;
-              timePositionIdx = groupKey;
+              if (call.getKind().equals(SqlKind.EXTRACT)) {
+                // case extract on time
+                timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeExtract(granularity);
+                builder.add(timeExtractionDimensionSpec.getOutputName());
+                dimensions.add(timeExtractionDimensionSpec);
+                granularity = Granularity.ALL;
+              } else {
+                // case floor by granularity
+                builder.add(s);
+                assert timePositionIdx == -1;
+                timePositionIdx = groupKey;
+              }
+
             } else {
-              dimensions.add(s);
+              dimensions.add(new DefaultDimensionSpec(s));
               builder.add(s);
             }
           } else {
@@ -544,12 +560,14 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         for (int groupKey : groupSet) {
           final String s = fieldNames.get(groupKey);
           if (s.equals(druidTable.timestampFieldName)) {
-            granularity = Granularity.NONE;
-            builder.add(s);
+            granularity = Granularity.ALL;
+            timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract();
+            builder.add(DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
+            dimensions.add(timeExtractionDimensionSpec);
             assert timePositionIdx == -1;
             timePositionIdx = groupKey;
           } else {
-            dimensions.add(s);
+            dimensions.add(new DefaultDimensionSpec(s));
             builder.add(s);
           }
         }
@@ -611,6 +629,12 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     try {
       final JsonGenerator generator = factory.createGenerator(sw);
 
+      if (aggregations.isEmpty()) {
+        // Druid requires at least one aggregation, otherwise gives:
+        //   Must have at least one AggregatorFactory
+        aggregations.add(
+                new JsonAggregation("longSum", "dummy_agg", "dummy_agg"));
+      }
       switch (queryType) {
       case TIMESERIES:
         generator.writeStartObject();
@@ -640,7 +664,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         generator.writeStringField("queryType", "topN");
         generator.writeStringField("dataSource", druidTable.dataSource);
         generator.writeStringField("granularity", granularity.value);
-        generator.writeStringField("dimension", dimensions.get(0));
+        writeField(generator, "dimension", dimensions.get(0));
         generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0)));
         writeFieldIf(generator, "filter", jsonFilter);
         writeField(generator, "aggregations", aggregations);
@@ -653,14 +677,6 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
       case GROUP_BY:
         generator.writeStartObject();
-
-        if (aggregations.isEmpty()) {
-          // Druid requires at least one aggregation, otherwise gives:
-          //   Must have at least one AggregatorFactory
-          aggregations.add(
-              new JsonAggregation("longSum", "dummy_agg", "dummy_agg"));
-        }
-
         generator.writeStringField("queryType", "groupBy");
         generator.writeStringField("dataSource", druidTable.dataSource);
         generator.writeStringField("granularity", granularity.value);
@@ -887,35 +903,38 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     }
 
     String translate(RexNode e, boolean set) {
+      int index = -1;
       switch (e.getKind()) {
       case INPUT_REF:
         final RexInputRef ref = (RexInputRef) e;
-        final String fieldName =
-            rowType.getFieldList().get(ref.getIndex()).getName();
-        if (set) {
-          if (druidTable.metricFieldNames.contains(fieldName)) {
-            metrics.add(fieldName);
-          } else if (!druidTable.timestampFieldName.equals(fieldName)
-              && !DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
-            dimensions.add(fieldName);
-          }
-        }
-        return fieldName;
-
+        index = ref.getIndex();
+        break;
       case CAST:
-        return tr(e, 0, set);
-
       case LITERAL:
         return ((RexLiteral) e).getValue3().toString();
-
       case FLOOR:
         final RexCall call = (RexCall) e;
         assert DruidDateTimeUtils.extractGranularity(call) != null;
-        return tr(call, 0, set);
-
-      default:
+        index = RelOptUtil.InputFinder.bits(e).asList().get(0);
+        break;
+      case EXTRACT:
+        final RexCall extractCall = (RexCall) e;
+        assert DruidDateTimeUtils.extractGranularity(extractCall) != null;
+        index = RelOptUtil.InputFinder.bits(e).asList().get(0);
+      }
+      if (index == -1) {
         throw new AssertionError("invalid expression " + e);
       }
+      final String fieldName = rowType.getFieldList().get(index).getName();
+      if (set) {
+        if (druidTable.metricFieldNames.contains(fieldName)) {
+          metrics.add(fieldName);
+        } else if (!druidTable.timestampFieldName.equals(fieldName)
+            && !DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
+          dimensions.add(fieldName);
+        }
+      }
+      return fieldName;
     }
 
     private JsonFilter translateFilter(RexNode e) {
@@ -1070,7 +1089,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
   /** Object that knows how to write itself to a
    * {@link com.fasterxml.jackson.core.JsonGenerator}. */
-  private interface Json {
+  public interface Json {
     void write(JsonGenerator generator) throws IOException;
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 0bc6b34..1170b0c 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -48,6 +48,7 @@ import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.runtime.PredicateImpl;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -414,11 +415,12 @@ public class DruidRules {
     }
 
     /* To be a valid Project, we allow it to contain references, and a single call
-     * to an FLOOR function on the timestamp column. Returns the reference to
-     * the timestamp, if any. */
+     * to an FLOOR function on the timestamp column OR Valid time extract on the top of time column
+     * Returns the reference to the timestamp, if any. */
     private static int validProject(Project project, DruidQuery query) {
       List<RexNode> nodes = project.getProjects();
       int idxTimestamp = -1;
+      boolean hasFloor = false;
       for (int i = 0; i < nodes.size(); i++) {
         final RexNode e = nodes.get(i);
         if (e instanceof RexCall) {
@@ -427,19 +429,33 @@ public class DruidRules {
           if (DruidDateTimeUtils.extractGranularity(call) == null) {
             return -1;
           }
-          if (idxTimestamp != -1) {
+          if (idxTimestamp != -1 && hasFloor) {
             // Already one usage of timestamp column
             return -1;
           }
-          if (!(call.getOperands().get(0) instanceof RexInputRef)) {
-            return -1;
-          }
-          final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
-          if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
-                  query.getTopNode(), query))) {
-            return -1;
+          if (call.getKind() == SqlKind.FLOOR) {
+            hasFloor = true;
+            if (!(call.getOperands().get(0) instanceof RexInputRef)) {
+              return -1;
+            }
+            final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
+            if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode(),
+                query
+            ))) {
+              return -1;
+            }
+            idxTimestamp = i;
+          } else {
+            RexInputRef ref;
+            // Case extract from Calcite EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0),86400000))
+            if (call.getOperands().get(1) instanceof RexCall) {
+              RexCall refCall = (RexCall) call.getOperands().get(1);
+              ref = (RexInputRef) ((RexCall) refCall.getOperands().get(0)).getOperands().get(0);
+            } else {
+              ref = (RexInputRef) call.getOperands().get(1);
+            }
+            idxTimestamp = ref.getIndex();
           }
-          idxTimestamp = i;
           continue;
         }
         if (!(e instanceof RexInputRef)) {
@@ -615,12 +631,8 @@ public class DruidRules {
       final Project project = (Project) topProject;
       for (int index : set) {
         RexNode node = project.getProjects().get(index);
-        if (node instanceof RexInputRef) {
-          newSet.set(((RexInputRef) node).getIndex());
-        } else if (node instanceof RexCall) {
-          RexCall call = (RexCall) node;
-          newSet.set(((RexInputRef) call.getOperands().get(0)).getIndex());
-        }
+        ImmutableBitSet setOfBits = RelOptUtil.InputFinder.bits(node);
+        newSet.addAll(setOfBits);
       }
       set = newSet.build();
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
new file mode 100644
index 0000000..c35cfb8
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+
+import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
+
+/**
+ * Extraction function dimension spec implementation
+ */
+public class ExtractionDimensionSpec implements DimensionSpec {
+  private final String dimension;
+  private final ExtractionFunction extractionFunction;
+  private final String outputName;
+
+  public ExtractionDimensionSpec(String dimension, ExtractionFunction extractionFunction,
+          String outputName
+  ) {
+    this.dimension = dimension;
+    this.extractionFunction = extractionFunction;
+    this.outputName = outputName;
+  }
+
+  @Override public void write(JsonGenerator generator) throws IOException {
+
+    generator.writeStartObject();
+    generator.writeStringField("type", "extraction");
+    generator.writeStringField("dimension", dimension);
+    writeFieldIf(generator, "outputName", outputName);
+    writeFieldIf(generator, "extractionFn", extractionFunction);
+    generator.writeEndObject();
+  }
+
+}
+
+// End ExtractionDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
new file mode 100644
index 0000000..cecca03
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+/**
+ * Extraction Function interface
+ */
+public interface ExtractionFunction extends DruidQuery.Json {
+}
+
+// End ExtractionFunction.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
new file mode 100644
index 0000000..26cea49
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+/**
+ * Time extraction dimension spec implementation
+ */
+public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
+  private final String outputName;
+
+  public TimeExtractionDimensionSpec(ExtractionFunction extractionFunction,
+          String outputName
+  ) {
+    super("__time", extractionFunction, outputName);
+    this.outputName = outputName;
+  }
+
+  public static TimeExtractionDimensionSpec makeFullTimeExtract() {
+    return new TimeExtractionDimensionSpec(
+            TimeExtractionFunction.createDefault(),
+            DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN
+    );
+  }
+
+  public String getOutputName() {
+    return outputName;
+  }
+
+  public static TimeExtractionDimensionSpec makeExtract(Granularity granularity) {
+    switch (granularity) {
+    case YEAR:
+      return new TimeExtractionDimensionSpec(
+              TimeExtractionFunction.createFromGranularity(granularity), "year");
+    case MONTH:
+      return new TimeExtractionDimensionSpec(
+              TimeExtractionFunction.createFromGranularity(granularity), "monthOfYear");
+    case DAY:
+      return new TimeExtractionDimensionSpec(
+            TimeExtractionFunction.createFromGranularity(granularity), "dayOfMonth");
+    default:
+      return null;
+    }
+  }
+}
+
+// End TimeExtractionDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
new file mode 100644
index 0000000..01ea2a7
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+
+import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
+
+/**
+ * Time extraction implementation
+ */
+public class TimeExtractionFunction implements ExtractionFunction {
+
+  private final String format;
+
+  private final String granularity;
+
+  private final String timeZone;
+
+  private final String local;
+
+  public TimeExtractionFunction(String format, String granularity, String timeZone, String local) {
+    this.format = format;
+    this.granularity = granularity;
+    this.timeZone = timeZone;
+    this.local = local;
+  }
+
+  @Override public void write(JsonGenerator generator) throws IOException {
+
+    generator.writeStartObject();
+    generator.writeStringField("type", "timeFormat");
+    writeFieldIf(generator, "format", format);
+    writeFieldIf(generator, "granularity", granularity);
+    writeFieldIf(generator, "timeZone", timeZone);
+    writeFieldIf(generator, "local", local);
+    generator.writeEndObject();
+  }
+
+  public static TimeExtractionFunction createDefault() {
+    return new TimeExtractionFunction("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", null, "UTC", null);
+  }
+
+  public static TimeExtractionFunction createFromGranularity(Granularity granularity) {
+    switch (granularity) {
+    case DAY:
+      return new TimeExtractionFunction("dd", null, "UTC", null);
+    case MONTH:
+      return new TimeExtractionFunction("MM", null, "UTC", null);
+    case YEAR:
+      return new TimeExtractionFunction("yyyy", null, "UTC", null);
+    case HOUR:
+      return new TimeExtractionFunction("hh", null, "UTC", null);
+    default:
+      throw new AssertionError("Extraction " + granularity.value + " is not valid");
+    }
+  }
+}
+
+// End TimeExtractionFunction.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ef03b6e4/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index e673c9c..7cc821f 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -164,7 +164,7 @@ public class DruidAdapterIT {
         + "filter=[=($17, 'Jeremy Corbyn')], groups=[{7}], aggs=[[]])\n";
     final String druidQuery = "{'queryType':'groupBy',"
         + "'dataSource':'wikiticker','granularity':'all',"
-        + "'dimensions':['countryName'],'limitSpec':{'type':'default'},"
+        + "'dimensions':[{'type':'default','dimension':'countryName'}],'limitSpec':{'type':'default'},"
         + "'filter':{'type':'selector','dimension':'page','value':'Jeremy Corbyn'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-01T00:00:00.000/3000-01-01T00:00:00.000']}";
@@ -252,7 +252,8 @@ public class DruidAdapterIT {
         + "    BindableProject(s=[$2], page=[$0], day=[$1])\n"
         + "      DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$17, FLOOR($0, FLAG(DAY)), $1]], groups=[{0, 1}], aggs=[[SUM($2)]])\n";
     final String druidQuery = "{'queryType':'groupBy',"
-        + "'dataSource':'wikiticker','granularity':'day','dimensions':['page'],"
+        + "'dataSource':'wikiticker','granularity':'day',\"dimensions\":[{\"type\":\"default\","
+            + "\"dimension\":\"page\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'s','fieldName':'added'}],"
         + "'intervals':['1900-01-01T00:00:00.000/3000-01-01T00:00:00.000']}";
@@ -288,7 +289,7 @@ public class DruidAdapterIT {
         + "where \"page\" = 'Jeremy Corbyn'";
     final String druidQuery = "{'queryType':'groupBy',"
         + "'dataSource':'wikiticker','granularity':'all',"
-        + "'dimensions':['countryName'],'limitSpec':{'type':'default'},"
+        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"countryName\"}],'limitSpec':{'type':'default'},"
         + "'filter':{'type':'selector','dimension':'page','value':'Jeremy Corbyn'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -385,7 +386,7 @@ public class DruidAdapterIT {
         + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], groups=[{30}], aggs=[[]])";
     final String sql = "select distinct \"state_province\" from \"foodmart\"";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
-        + "'dimensions':['state_province'],'limitSpec':{'type':'default'},"
+        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"}],'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     sql(sql)
@@ -441,7 +442,8 @@ public class DruidAdapterIT {
     final String sql = "select \"product_id\" from \"foodmart\" where "
             + "\"product_id\" = 1020 group by \"product_id\"";
     final String druidQuery = "{\"queryType\":\"groupBy\",\"dataSource\":\"foodmart\","
-            + "\"granularity\":\"all\",\"dimensions\":[\"product_id\"],"
+            + "\"granularity\":\"all\",\"dimensions\":[{\"type\":\"default\","
+            + "\"dimension\":\"product_id\"}],"
             + "\"limitSpec\":{\"type\":\"default\"},\"filter\":{\"type\":\"selector\","
             + "\"dimension\":\"product_id\",\"value\":\"1020\"},"
             + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"dummy_agg\","
@@ -459,7 +461,8 @@ public class DruidAdapterIT {
             .queryContains(
                     druidChecker("{\"queryType\":\"groupBy"
             + "\",\"dataSource\":\"foodmart\",\"granularity\":\"all\","
-            + "\"dimensions\":[\"product_id\"],\"limitSpec\":{\"type\":\"default\"},"
+            + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"product_id\"}],"
+            + "\"limitSpec\":{\"type\":\"default\"},"
             + "\"filter\":{\"type\":\"selector\",\"dimension\":\"product_id\","
             + "\"value\":\"1020\"},\"aggregations\":[{\"type\":\"longSum\","
             + "\"name\":\"dummy_agg\",\"fieldName\":\"dummy_agg\"}],"
@@ -554,7 +557,8 @@ public class DruidAdapterIT {
     final String sql = "select distinct \"gender\", \"state_province\"\n"
         + "from \"foodmart\" fetch next 3 rows only";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all','dimensions':['gender','state_province'],'limitSpec':{'type':'default'},"
+        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\",\"dimension\":\"gender\"},"
+            + "{\"type\":\"default\",\"dimension\":\"state_province\"}],'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     final String explain = "PLAN="
@@ -576,7 +580,8 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", \"gender\"\n"
         + "order by s desc limit 3";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all','dimensions':['brand_name','gender'],"
+        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\","
+            + "\"dimension\":\"brand_name\"},{\"type\":\"default\",\"dimension\":\"gender\"}],"
         + "'limitSpec':{'type':'default','limit':3,'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -613,12 +618,13 @@ public class DruidAdapterIT {
         + "group by \"brand_name\"\n"
         + "order by s desc limit 3";
     final String approxDruid = "{'queryType':'topN','dataSource':'foodmart',"
-        + "'granularity':'all','dimension':'brand_name','metric':'S',"
+        + "'granularity':'all',\"dimension\":{\"type\":\"default\",\"dimension\":\"brand_name\"},"
+            + "'metric':'S',"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
         + "'threshold':3}";
     final String exactDruid = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all','dimensions':['brand_name'],"
+        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
         + "'limitSpec':{'type':'default','limit':3,"
         + "'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
@@ -654,7 +660,7 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':['brand_name'],"
+        + "'granularity':'day','dimensions':[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -688,7 +694,7 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':['brand_name'],"
+        + "'granularity':'day','dimensions':[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -717,7 +723,8 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by \"brand_name\"";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':['brand_name'],"
+        + "'granularity':'day','dimensions':[{\"type\":\"default\","
+            + "\"dimension\":\"brand_name\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -1189,7 +1196,8 @@ public class DruidAdapterIT {
         + "    BindableProject(S=[$2], M=[$3], P=[$0])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$30, FLOOR($0, FLAG(MONTH)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]])";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'month','dimensions':['state_province'],"
+        + "'granularity':'month','dimensions':[{\"type\":\"default\","
+            + "\"dimension\":\"state_province\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
         + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
@@ -1220,7 +1228,8 @@ public class DruidAdapterIT {
         + "projects=[[$30, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
         + "aggs=[[SUM($2), MAX($2)]]";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':['state_province'],"
+        + "'granularity':'day','dimensions':[{\"type\":\"default\","
+            + "\"dimension\":\"state_province\"}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
         + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
@@ -1286,7 +1295,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{\"queryType\":\"groupBy\","
                         + "\"dataSource\":\"foodmart\","
                         + "\"granularity\":\"all\","
-                        + "\"dimensions\":[\"state_province\"],"
+                        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"}],"
                         + "\"limitSpec\":{\"type\":\"default\"},"
                         + "\"aggregations\":[{\"type\":\"cardinality\",\"name\":\"$f1\",\"fieldNames\":[\"city\"]}],"
                         + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000\"]}";
@@ -1324,7 +1333,9 @@ public class DruidAdapterIT {
         + "and \"quarter\" in ('Q2', 'Q3')\n"
         + "and \"state_province\" = 'WA'";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
-        + "'dimensions':['state_province','city','product_name'],'limitSpec':{'type':'default'},"
+        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"},"
+            + "{\"type\":\"default\",\"dimension\":\"city\"},{\"type\":\"default\","
+            + "\"dimension\":\"product_name\"}],'limitSpec':{'type':'default'},"
         + "'filter':{'type':'and','fields':[{'type':'selector','dimension':'product_name',"
         + "'value':'High Top Dried Mushrooms'},{'type':'or','fields':[{'type':'selector',"
         + "'dimension':'quarter','value':'Q2'},{'type':'selector','dimension':'quarter',"
@@ -1504,6 +1515,165 @@ public class DruidAdapterIT {
             "\"upper\":\"12223\""));
   }
 
+  @Test public void testPushAggragateOnTime() {
+    String sql =
+            "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" where "
+                    + "\"product_id\" = 1016 and "
+                    + "\"timestamp\" < cast('1997-01-03' as timestamp) and \"timestamp\" > cast"
+                    + "('1990-01-01' as timestamp)" + " group by"
+                    + "\"timestamp\", \"product_id\" ";
+
+    String druidQuery = "{\"queryType\":\"groupBy\",\"dataSource\":\"foodmart\","
+            + "\"granularity\":\"all\",\"dimensions\":[{\"type\":\"extraction\","
+            + "\"dimension\":\"__time\",\"outputName\":\"timestamp\","
+            + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd";
+    sql(sql).returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
+            .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testPushAggragateOnTimeWithExtractYear() {
+    String sql =
+            "select EXTRACT( year from \"timestamp\") as \"year\",\"product_id\"  from "
+                    + "\"foodmart\" where \"product_id\" = 1016 and "
+                    + "\"timestamp\" < cast('1999-01-02' as timestamp) and \"timestamp\" > cast"
+                    + "('1997-01-01' as timestamp)" + " group by "
+                    + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(druidChecker
+      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+              + "\"dimension\":\"__time\",\"outputName\":\"year\","
+              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
+              + "\"timeZone\":\"UTC\"}}"
+      )
+    ).returnsUnordered("year=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggragateOnTimeWithExtractMonth() {
+    String sql =
+            "select EXTRACT( month from \"timestamp\") as \"month\",\"product_id\"  from "
+                    + "\"foodmart\" where \"product_id\" = 1016 and "
+                    + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+                    + "('1997-01-01' as timestamp)" + " group by "
+                    + " EXTRACT( month from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(druidChecker
+      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+              + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
+              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
+              + "\"timeZone\":\"UTC\"}}"
+      )
+    ).returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
+            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016"
+    );
+  }
+
+  @Test public void testPushAggragateOnTimeWithExtractDay() {
+    String sql =
+            "select EXTRACT( day from \"timestamp\") as \"day\",\"product_id\"  from \"foodmart\""
+                    + " where \"product_id\" = 1016 and "
+                    + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+                    + "('1997-01-01' as timestamp)" + " group by "
+                    + " EXTRACT( day from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(druidChecker
+      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+              + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
+              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
+              + "\"timeZone\":\"UTC\"}}"
+      )
+    ).returnsUnordered("day=02; product_id=1016", "day=10; product_id=1016",
+            "day=13; product_id=1016", "day=16; product_id=1016"
+    );
+  }
+
+  //Calcite rewrite this push as
+  // rel#85:BindableProject.BINDABLE.[](input=rel#69:Subset#1.BINDABLE.[],
+  // hourOfDay=/INT(MOD(Reinterpret($0), 86400000), 3600000),product_id=$1)
+  // hence not sure if that is valid for hive as well.
+  @Ignore @Test public void testPushAggragateOnTimeWithExtractHourOfDay() {
+    String sql =
+            "select EXTRACT( hour from \"timestamp\") as \"hourOfDay\",\"product_id\"  from "
+                    + "\"foodmart\" where \"product_id\" = 1016 and "
+                    + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+                    + "('1997-01-01' as timestamp)" + " group by "
+                    + " EXTRACT( hour from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(druidChecker
+      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+              + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
+              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
+              + "\"timeZone\":\"UTC\"}}"
+      )
+    ).returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
+            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016"
+    );
+  }
+
+  @Test public void testPushAggragateOnTimeWithExtractYearMonthDay() {
+    String sql = "select EXTRACT( day from \"timestamp\") as \"day\", EXTRACT( month from "
+            + "\"timestamp\") as \"month\",  EXTRACT( year from \"timestamp\") as \"year\",\""
+            + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+            + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+            + "('1997-01-01' as timestamp)"
+            + " group by "
+            + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+            + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(
+            druidChecker(",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
+                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
+                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"year\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
+                    + "\"timeZone\":\"UTC\"}}")).explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
+            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
+            + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
+            + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+            .returnsUnordered("day=02; month=01; year=1997; product_id=1016",
+                    "day=10; month=01; year=1997; product_id=1016",
+                    "day=13; month=01; year=1997; product_id=1016",
+                    "day=16; month=01; year=1997; product_id=1016"
+    );
+  }
+
+  @Test public void testPushAggragateOnTimeWithExtractYearMonthDayWithOutRenaming() {
+    String sql = "select EXTRACT( day from \"timestamp\"), EXTRACT( month from "
+            + "\"timestamp\"),  EXTRACT( year from \"timestamp\"),\""
+            + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+            + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+            + "('1997-01-01' as timestamp)"
+            + " group by "
+            + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+            + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+
+    sql(sql).queryContains(
+            druidChecker(",\"granularity\":\"all\"", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
+                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
+                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
+                    + "\"dimension\":\"__time\",\"outputName\":\"year\","
+                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
+                    + "\"timeZone\":\"UTC\"}}")).explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
+            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
+            + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
+            + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+            .returnsUnordered("EXPR$0=02; EXPR$1=01; EXPR$2=1997; product_id=1016",
+                    "EXPR$0=10; EXPR$1=01; EXPR$2=1997; product_id=1016",
+                    "EXPR$0=13; EXPR$1=01; EXPR$2=1997; product_id=1016",
+                    "EXPR$0=16; EXPR$1=01; EXPR$2=1997; product_id=1016"
+    );
+  }
 }
 
 // End DruidAdapterIT.java


[2/2] calcite git commit: [CALCITE-1725] Push project aggregate of time extract to druid

Posted by jc...@apache.org.
[CALCITE-1725] Push project aggregate of time extract to druid

* Fixes in Extract output column name to avoid possible name collisions.
* Handling CAST in translation.
* Style fixes.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/04c0c8b6
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/04c0c8b6
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/04c0c8b6

Branch: refs/heads/master
Commit: 04c0c8b6f212e513aba76a36e62f99e4bde4370e
Parents: ef03b6e
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Apr 10 12:26:00 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon Apr 10 12:28:11 2017 +0100

----------------------------------------------------------------------
 .../adapter/druid/DruidDateTimeUtils.java       |  12 +-
 .../calcite/adapter/druid/DruidQuery.java       |  45 +-
 .../calcite/adapter/druid/DruidRules.java       |   7 +-
 .../adapter/druid/ExtractionDimensionSpec.java  |   8 +-
 .../druid/TimeExtractionDimensionSpec.java      |  28 +-
 .../adapter/druid/TimeExtractionFunction.java   |   4 -
 .../org/apache/calcite/test/DruidAdapterIT.java | 461 ++++++++++---------
 7 files changed, 295 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 39a1538..89c68b3 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -250,9 +250,9 @@ public class DruidDateTimeUtils {
   }
 
   /**
-   * Extracts granularity from a call {@code FLOOR(<time> TO <timeunit>)}.
-   * Timeunit specifies the granularity. Returns null if it cannot
-   * be inferred.
+   * Infers granularity from a timeunit.
+   * It support {@code FLOOR(<time> TO <timeunit>)} and {@code EXTRACT(<timeunit> FROM <time>)}.
+   * It returns null if it cannot be inferred.
    *
    * @param call the function call
    * @return the granularity, or null if it cannot be inferred
@@ -262,9 +262,13 @@ public class DruidDateTimeUtils {
             || call.getOperands().size() != 2) {
       return null;
     }
-    int flagIndex = 1;
+    int flagIndex;
     if (call.getKind() == SqlKind.EXTRACT) {
+      // EXTRACT
       flagIndex = 0;
+    } else {
+      // FLOOR
+      flagIndex = 1;
     }
     final RexLiteral flag = (RexLiteral) call.operands.get(flagIndex);
     final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 2661a06..62ebe85 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -94,6 +94,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
   final ImmutableList<RelNode> rels;
 
   private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
+  private static final String EXTRACT_COLUMN_NAME_PREFIX = "extract";
   protected static final String DRUID_QUERY_FETCH = "druid.query.fetch";
 
   /**
@@ -507,6 +508,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       assert aggCalls.size() == aggNames.size();
 
       int timePositionIdx = -1;
+      int extractNumber = -1;
       final ImmutableList.Builder<String> builder = ImmutableList.builder();
       if (projects != null) {
         for (int groupKey : groupSet) {
@@ -519,9 +521,15 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
                 .getFieldList().get(ref.getIndex()).getName();
             if (origin.equals(druidTable.timestampFieldName)) {
               granularity = Granularity.ALL;
-              timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract();
+              // Generate unique name as timestampFieldName is taken
+              String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+              while (fieldNames.contains(extractColumnName)) {
+                extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+              }
+              timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
+                  extractColumnName);
               dimensions.add(timeExtractionDimensionSpec);
-              builder.add(DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
+              builder.add(extractColumnName);
               assert timePositionIdx == -1;
               timePositionIdx = groupKey;
             } else {
@@ -534,15 +542,21 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
             final Granularity funcGranularity =
                 DruidDateTimeUtils.extractGranularity(call);
             if (funcGranularity != null) {
-              granularity = funcGranularity;
               if (call.getKind().equals(SqlKind.EXTRACT)) {
                 // case extract on time
-                timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeExtract(granularity);
-                builder.add(timeExtractionDimensionSpec.getOutputName());
-                dimensions.add(timeExtractionDimensionSpec);
                 granularity = Granularity.ALL;
+                // Generate unique name as timestampFieldName is taken
+                String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+                while (fieldNames.contains(extractColumnName)) {
+                  extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+                }
+                timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeExtract(
+                    funcGranularity, extractColumnName);
+                dimensions.add(timeExtractionDimensionSpec);
+                builder.add(extractColumnName);
               } else {
                 // case floor by granularity
+                granularity = funcGranularity;
                 builder.add(s);
                 assert timePositionIdx == -1;
                 timePositionIdx = groupKey;
@@ -561,9 +575,15 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
           final String s = fieldNames.get(groupKey);
           if (s.equals(druidTable.timestampFieldName)) {
             granularity = Granularity.ALL;
-            timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract();
-            builder.add(DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
+            // Generate unique name as timestampFieldName is taken
+            String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+            while (fieldNames.contains(extractColumnName)) {
+              extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
+            }
+            timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
+                extractColumnName);
             dimensions.add(timeExtractionDimensionSpec);
+            builder.add(extractColumnName);
             assert timePositionIdx == -1;
             timePositionIdx = groupKey;
           } else {
@@ -902,7 +922,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       }
     }
 
-    String translate(RexNode e, boolean set) {
+    @SuppressWarnings("incomplete-switch") String translate(RexNode e, boolean set) {
       int index = -1;
       switch (e.getKind()) {
       case INPUT_REF:
@@ -910,17 +930,14 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         index = ref.getIndex();
         break;
       case CAST:
+        return tr(e, 0, set);
       case LITERAL:
         return ((RexLiteral) e).getValue3().toString();
       case FLOOR:
+      case EXTRACT:
         final RexCall call = (RexCall) e;
         assert DruidDateTimeUtils.extractGranularity(call) != null;
         index = RelOptUtil.InputFinder.bits(e).asList().get(0);
-        break;
-      case EXTRACT:
-        final RexCall extractCall = (RexCall) e;
-        assert DruidDateTimeUtils.extractGranularity(extractCall) != null;
-        index = RelOptUtil.InputFinder.bits(e).asList().get(0);
       }
       if (index == -1) {
         throw new AssertionError("invalid expression " + e);

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 1170b0c..8f088bb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -415,7 +415,7 @@ public class DruidRules {
     }
 
     /* To be a valid Project, we allow it to contain references, and a single call
-     * to an FLOOR function on the timestamp column OR Valid time extract on the top of time column
+     * to a FLOOR function on the timestamp column OR valid time EXTRACT on the timestamp column.
      * Returns the reference to the timestamp, if any. */
     private static int validProject(Project project, DruidQuery query) {
       List<RexNode> nodes = project.getProjects();
@@ -439,9 +439,8 @@ public class DruidRules {
               return -1;
             }
             final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
-            if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode(),
-                query
-            ))) {
+            if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
+                query.getTopNode(), query))) {
               return -1;
             }
             idxTimestamp = i;

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
index c35cfb8..45db374 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionDimensionSpec.java
@@ -31,15 +31,17 @@ public class ExtractionDimensionSpec implements DimensionSpec {
   private final String outputName;
 
   public ExtractionDimensionSpec(String dimension, ExtractionFunction extractionFunction,
-          String outputName
-  ) {
+      String outputName) {
     this.dimension = dimension;
     this.extractionFunction = extractionFunction;
     this.outputName = outputName;
   }
 
-  @Override public void write(JsonGenerator generator) throws IOException {
+  public String getOutputName() {
+    return outputName;
+  }
 
+  @Override public void write(JsonGenerator generator) throws IOException {
     generator.writeStartObject();
     generator.writeStringField("type", "extraction");
     generator.writeStringField("dimension", dimension);

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
index 26cea49..669c1c2 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
@@ -20,37 +20,29 @@ package org.apache.calcite.adapter.druid;
  * Time extraction dimension spec implementation
  */
 public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
-  private final String outputName;
 
-  public TimeExtractionDimensionSpec(ExtractionFunction extractionFunction,
-          String outputName
-  ) {
-    super("__time", extractionFunction, outputName);
-    this.outputName = outputName;
+  public TimeExtractionDimensionSpec(
+      ExtractionFunction extractionFunction, String outputName) {
+    super(DruidTable.DEFAULT_TIMESTAMP_COLUMN, extractionFunction, outputName);
   }
 
-  public static TimeExtractionDimensionSpec makeFullTimeExtract() {
+  public static TimeExtractionDimensionSpec makeFullTimeExtract(String outputName) {
     return new TimeExtractionDimensionSpec(
-            TimeExtractionFunction.createDefault(),
-            DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN
-    );
+        TimeExtractionFunction.createDefault(), outputName);
   }
 
-  public String getOutputName() {
-    return outputName;
-  }
-
-  public static TimeExtractionDimensionSpec makeExtract(Granularity granularity) {
+  public static TimeExtractionDimensionSpec makeExtract(
+      Granularity granularity, String outputName) {
     switch (granularity) {
     case YEAR:
       return new TimeExtractionDimensionSpec(
-              TimeExtractionFunction.createFromGranularity(granularity), "year");
+          TimeExtractionFunction.createFromGranularity(granularity), outputName);
     case MONTH:
       return new TimeExtractionDimensionSpec(
-              TimeExtractionFunction.createFromGranularity(granularity), "monthOfYear");
+          TimeExtractionFunction.createFromGranularity(granularity), outputName);
     case DAY:
       return new TimeExtractionDimensionSpec(
-            TimeExtractionFunction.createFromGranularity(granularity), "dayOfMonth");
+          TimeExtractionFunction.createFromGranularity(granularity), outputName);
     default:
       return null;
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
index 01ea2a7..abcedbe 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -28,11 +28,8 @@ import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
 public class TimeExtractionFunction implements ExtractionFunction {
 
   private final String format;
-
   private final String granularity;
-
   private final String timeZone;
-
   private final String local;
 
   public TimeExtractionFunction(String format, String granularity, String timeZone, String local) {
@@ -43,7 +40,6 @@ public class TimeExtractionFunction implements ExtractionFunction {
   }
 
   @Override public void write(JsonGenerator generator) throws IOException {
-
     generator.writeStartObject();
     generator.writeStringField("type", "timeFormat");
     writeFieldIf(generator, "format", format);

http://git-wip-us.apache.org/repos/asf/calcite/blob/04c0c8b6/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 7cc821f..62306f7 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -252,8 +252,8 @@ public class DruidAdapterIT {
         + "    BindableProject(s=[$2], page=[$0], day=[$1])\n"
         + "      DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$17, FLOOR($0, FLAG(DAY)), $1]], groups=[{0, 1}], aggs=[[SUM($2)]])\n";
     final String druidQuery = "{'queryType':'groupBy',"
-        + "'dataSource':'wikiticker','granularity':'day',\"dimensions\":[{\"type\":\"default\","
-            + "\"dimension\":\"page\"}],"
+        + "'dataSource':'wikiticker','granularity':'day',"
+        + "'dimensions':[{'type':'default','dimension':'page'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'s','fieldName':'added'}],"
         + "'intervals':['1900-01-01T00:00:00.000/3000-01-01T00:00:00.000']}";
@@ -289,7 +289,7 @@ public class DruidAdapterIT {
         + "where \"page\" = 'Jeremy Corbyn'";
     final String druidQuery = "{'queryType':'groupBy',"
         + "'dataSource':'wikiticker','granularity':'all',"
-        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"countryName\"}],'limitSpec':{'type':'default'},"
+        + "'dimensions':[{'type':'default','dimension':'countryName'}],'limitSpec':{'type':'default'},"
         + "'filter':{'type':'selector','dimension':'page','value':'Jeremy Corbyn'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -386,7 +386,7 @@ public class DruidAdapterIT {
         + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], groups=[{30}], aggs=[[]])";
     final String sql = "select distinct \"state_province\" from \"foodmart\"";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
-        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"}],'limitSpec':{'type':'default'},"
+        + "'dimensions':[{'type':'default','dimension':'state_province'}],'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     sql(sql)
@@ -419,36 +419,33 @@ public class DruidAdapterIT {
             + "    DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[=($1, 1020)],"
             + " projects=[[$90, $1]])\n";
-    sql(sql).explainContains(plan).
-    queryContains(
-            druidChecker("{\"queryType\":\"select\",\"dataSource\":\"foodmart\","
-                    + "\"descending\":false,"
-                    + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000"
-                    + "\"],\"filter\":{\"type\":\"selector\",\"dimension\":\"product_id\","
-                    + "\"value\":\"1020\"},\"dimensions\":[\"product_id\"],"
-                    + "\"metrics\":[\"store_sales\"],\"granularity\":\"all\","
-                    + "\"pagingSpec\":{\"threshold\":16384,\"fromNext\":true},"
-                    + "\"context\":{\"druid.query.fetch\":false}}")
-    )
-    .returnsUnordered("store_sales=0.5099999904632568; product_id=1020",
-                    "store_sales=1.0199999809265137; product_id=1020",
-                    "store_sales=1.5299999713897705; product_id=1020",
-                    "store_sales=2.0399999618530273; product_id=1020",
-                    "store_sales=2.549999952316284; product_id=1020"
-    );
+    final String druidQuery = "{'queryType':'select','dataSource':'foodmart','descending':false,"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'filter':{'type':'selector','dimension':'product_id','value':'1020'},"
+            + "'dimensions':['product_id'],'metrics':['store_sales'],'granularity':'all',"
+            + "'pagingSpec':{'threshold':16384,'fromNext':true},"
+            + "'context':{'druid.query.fetch':false}}";
+    sql(sql)
+        .explainContains(plan)
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("store_sales=0.5099999904632568; product_id=1020",
+            "store_sales=1.0199999809265137; product_id=1020",
+            "store_sales=1.5299999713897705; product_id=1020",
+            "store_sales=2.0399999618530273; product_id=1020",
+            "store_sales=2.549999952316284; product_id=1020");
   }
 
   @Test public void testPushSimpleGroupBy() {
     final String sql = "select \"product_id\" from \"foodmart\" where "
             + "\"product_id\" = 1020 group by \"product_id\"";
-    final String druidQuery = "{\"queryType\":\"groupBy\",\"dataSource\":\"foodmart\","
-            + "\"granularity\":\"all\",\"dimensions\":[{\"type\":\"default\","
-            + "\"dimension\":\"product_id\"}],"
-            + "\"limitSpec\":{\"type\":\"default\"},\"filter\":{\"type\":\"selector\","
-            + "\"dimension\":\"product_id\",\"value\":\"1020\"},"
-            + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"dummy_agg\","
-            + "\"fieldName\":\"dummy_agg\"}],"
-            + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000\"]}";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+            + "'granularity':'all','dimensions':[{'type':'default',"
+            + "'dimension':'product_id'}],"
+            + "'limitSpec':{'type':'default'},'filter':{'type':'selector',"
+            + "'dimension':'product_id','value':'1020'},"
+            + "'aggregations':[{'type':'longSum','name':'dummy_agg',"
+            + "'fieldName':'dummy_agg'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     sql(sql).queryContains(druidChecker(druidQuery)).returnsUnordered("product_id=1020");
   }
 
@@ -456,18 +453,18 @@ public class DruidAdapterIT {
     final String innerQuery = "select \"product_id\" as \"id\" from \"foodmart\" where "
             + "\"product_id\" = 1020";
     final String sql = "select \"id\" from (" + innerQuery + ") group by \"id\"";
-
-    sql(sql).returnsUnordered("id=1020")
-            .queryContains(
-                    druidChecker("{\"queryType\":\"groupBy"
-            + "\",\"dataSource\":\"foodmart\",\"granularity\":\"all\","
-            + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"product_id\"}],"
-            + "\"limitSpec\":{\"type\":\"default\"},"
-            + "\"filter\":{\"type\":\"selector\",\"dimension\":\"product_id\","
-            + "\"value\":\"1020\"},\"aggregations\":[{\"type\":\"longSum\","
-            + "\"name\":\"dummy_agg\",\"fieldName\":\"dummy_agg\"}],"
-            + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000\"]}"));
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+            + "'granularity':'all',"
+            + "'dimensions':[{'type':'default','dimension':'product_id'}],"
+            + "'limitSpec':{'type':'default'},"
+            + "'filter':{'type':'selector','dimension':'product_id','value':'1020'},"
+            + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
+    sql(sql)
+        .returnsUnordered("id=1020")
+        .queryContains(druidChecker(druidQuery));
   }
+
   /** Test case for
    * <a href="https://issues.apache.org/jira/browse/CALCITE-1281">[CALCITE-1281]
    * Druid adapter wrongly returns all numeric values as int or float</a>. */
@@ -557,8 +554,8 @@ public class DruidAdapterIT {
     final String sql = "select distinct \"gender\", \"state_province\"\n"
         + "from \"foodmart\" fetch next 3 rows only";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\",\"dimension\":\"gender\"},"
-            + "{\"type\":\"default\",\"dimension\":\"state_province\"}],'limitSpec':{'type':'default'},"
+        + "'granularity':'all','dimensions':[{'type':'default','dimension':'gender'},"
+        + "{'type':'default','dimension':'state_province'}],'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     final String explain = "PLAN="
@@ -580,8 +577,8 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", \"gender\"\n"
         + "order by s desc limit 3";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\","
-            + "\"dimension\":\"brand_name\"},{\"type\":\"default\",\"dimension\":\"gender\"}],"
+        + "'granularity':'all','dimensions':[{'type':'default','dimension':'brand_name'},"
+        + "{'type':'default','dimension':'gender'}],"
         + "'limitSpec':{'type':'default','limit':3,'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -617,14 +614,13 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by \"brand_name\"\n"
         + "order by s desc limit 3";
-    final String approxDruid = "{'queryType':'topN','dataSource':'foodmart',"
-        + "'granularity':'all',\"dimension\":{\"type\":\"default\",\"dimension\":\"brand_name\"},"
-            + "'metric':'S',"
+    final String approxDruid = "{'queryType':'topN','dataSource':'foodmart','granularity':'all',"
+        + "'dimension':{'type':'default','dimension':'brand_name'},'metric':'S',"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
         + "'threshold':3}";
     final String exactDruid = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'all',\"dimensions\":[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
+        + "'granularity':'all','dimensions':[{'type':'default','dimension':'brand_name'}],"
         + "'limitSpec':{'type':'default','limit':3,"
         + "'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
@@ -660,7 +656,7 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
+        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -694,7 +690,7 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{\"type\":\"default\",\"dimension\":\"brand_name\"}],"
+        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -723,8 +719,7 @@ public class DruidAdapterIT {
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by \"brand_name\"";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{\"type\":\"default\","
-            + "\"dimension\":\"brand_name\"}],"
+        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -1196,8 +1191,8 @@ public class DruidAdapterIT {
         + "    BindableProject(S=[$2], M=[$3], P=[$0])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$30, FLOOR($0, FLAG(MONTH)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]])";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'month','dimensions':[{\"type\":\"default\","
-            + "\"dimension\":\"state_province\"}],"
+        + "'granularity':'month',"
+        + "'dimensions':[{'type':'default','dimension':'state_province'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
         + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
@@ -1227,9 +1222,8 @@ public class DruidAdapterIT {
         + "intervals=[[1997-01-01T00:00:00.000/1997-09-01T00:00:00.000]], "
         + "projects=[[$30, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
         + "aggs=[[SUM($2), MAX($2)]]";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{\"type\":\"default\","
-            + "\"dimension\":\"state_province\"}],"
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'day',"
+        + "'dimensions':[{'type':'default','dimension':'state_province'}],"
         + "'limitSpec':{'type':'default'},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
         + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
@@ -1291,14 +1285,12 @@ public class DruidAdapterIT {
         + "  BindableSort(sort0=[$1], dir0=[DESC], fetch=[2])\n"
         + "    BindableProject(state_province=[$0], CDC=[FLOOR($1)])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], groups=[{30}], aggs=[[COUNT(DISTINCT $29)]])\n";
-
-    final String druidQuery = "{\"queryType\":\"groupBy\","
-                        + "\"dataSource\":\"foodmart\","
-                        + "\"granularity\":\"all\","
-                        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"}],"
-                        + "\"limitSpec\":{\"type\":\"default\"},"
-                        + "\"aggregations\":[{\"type\":\"cardinality\",\"name\":\"$f1\",\"fieldNames\":[\"city\"]}],"
-                        + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000\"]}";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all',"
+        + "'dimensions':[{'type':'default','dimension':'state_province'}],"
+        + "'limitSpec':{'type':'default'},"
+        + "'aggregations':[{'type':'cardinality','name':'$f1','fieldNames':['city']}],"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     sql(sql)
         .explainContains(explain)
         .queryContains(druidChecker(druidQuery))
@@ -1333,9 +1325,9 @@ public class DruidAdapterIT {
         + "and \"quarter\" in ('Q2', 'Q3')\n"
         + "and \"state_province\" = 'WA'";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
-        + "\"dimensions\":[{\"type\":\"default\",\"dimension\":\"state_province\"},"
-            + "{\"type\":\"default\",\"dimension\":\"city\"},{\"type\":\"default\","
-            + "\"dimension\":\"product_name\"}],'limitSpec':{'type':'default'},"
+        + "'dimensions':[{'type':'default','dimension':'state_province'},"
+        + "{'type':'default','dimension':'city'},"
+        + "{'type':'default','dimension':'product_name'}],'limitSpec':{'type':'default'},"
         + "'filter':{'type':'and','fields':[{'type':'selector','dimension':'product_name',"
         + "'value':'High Top Dried Mushrooms'},{'type':'or','fields':[{'type':'selector',"
         + "'dimension':'quarter','value':'Q2'},{'type':'selector','dimension':'quarter',"
@@ -1476,25 +1468,23 @@ public class DruidAdapterIT {
         + "and floor(\"time\" to DAY) < '1997-09-01 00:00:00'\n"
         + "group by \"countryName\", floor(\"time\" TO DAY)\n"
         + "order by c limit 5";
-
     String plan = "BindableProject(countryName=[$0], EXPR$1=[$1], C=[CAST($2):INTEGER NOT NULL])\n"
         + "    BindableSort(sort0=[$2], dir0=[ASC], fetch=[5])\n"
         + "      BindableAggregate(group=[{0, 1}], agg#0=[COUNT()])\n"
         + "        BindableProject(countryName=[$1], EXPR$1=[FLOOR($0, FLAG(DAY))])\n"
         + "          BindableFilter(condition=[AND(>=(FLOOR($0, FLAG(DAY)), 1997-01-01 00:00:00), <(FLOOR($0, FLAG(DAY)), 1997-09-01 00:00:00))])\n"
         + "            DruidQuery(table=[[wiki, wiki]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$0, $5]])";
-
     // NOTE: Druid query only has countryName as the dimension
     // being queried after project is pushed to druid query.
-    String druidQuery = "{\"queryType\":\"select\","
-        + "\"dataSource\":\"wikiticker\","
-        + "\"descending\":false,"
-        + "\"intervals\":[\"1900-01-09T00:00:00.000/2992-01-10T00:00:00.000\"],"
-        + "\"dimensions\":[\"countryName\"],"
-        + "\"metrics\":[],"
-        + "\"granularity\":\"all\","
-        + "\"pagingSpec\":{\"threshold\":16384,\"fromNext\":true},"
-        + "\"context\":{\"druid.query.fetch\":false}}";
+    String druidQuery = "{'queryType':'select',"
+        + "'dataSource':'wikiticker',"
+        + "'descending':false,"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+        + "'dimensions':['countryName'],"
+        + "'metrics':[],"
+        + "'granularity':'all',"
+        + "'pagingSpec':{'threshold':16384,'fromNext':true},"
+        + "'context':{'druid.query.fetch':false}}";
     sql(sql, WIKI).explainContains(plan);
     sql(sql, WIKI).queryContains(druidChecker(druidQuery));
   }
@@ -1511,168 +1501,193 @@ public class DruidAdapterIT {
     String sql = "select \"product_id\" from \"foodmart\"\n"
         + "where cast(\"product_id\" as double) < 0.41024 and \"product_id\" < 12223";
     sql(sql).queryContains(
-        druidChecker("\"type\":\"bound\",\"dimension\":\"product_id\",\"upper\":\"0.41024\"",
-            "\"upper\":\"12223\""));
-  }
-
-  @Test public void testPushAggragateOnTime() {
-    String sql =
-            "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" where "
-                    + "\"product_id\" = 1016 and "
-                    + "\"timestamp\" < cast('1997-01-03' as timestamp) and \"timestamp\" > cast"
-                    + "('1990-01-01' as timestamp)" + " group by"
-                    + "\"timestamp\", \"product_id\" ";
-
-    String druidQuery = "{\"queryType\":\"groupBy\",\"dataSource\":\"foodmart\","
-            + "\"granularity\":\"all\",\"dimensions\":[{\"type\":\"extraction\","
-            + "\"dimension\":\"__time\",\"outputName\":\"timestamp\","
-            + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd";
-    sql(sql).returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
-            .queryContains(druidChecker(druidQuery));
+        druidChecker("'type':'bound','dimension':'product_id','upper':'0.41024'",
+            "'upper':'12223'"));
+  }
+
+  @Test public void testPushAggregateOnTime() {
+    String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" where "
+        + "\"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-03' as timestamp) and \"timestamp\" > cast"
+        + "('1990-01-01' as timestamp)" + " group by"
+        + "\"timestamp\", \"product_id\" ";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'timestamp',"
+        + "'extractionFn':{'type':'timeFormat','format':'yyyy-MM-dd";
+    sql(sql)
+        .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
+        .queryContains(druidChecker(druidQuery));
   }
 
-  @Test public void testPushAggragateOnTimeWithExtractYear() {
-    String sql =
-            "select EXTRACT( year from \"timestamp\") as \"year\",\"product_id\"  from "
-                    + "\"foodmart\" where \"product_id\" = 1016 and "
-                    + "\"timestamp\" < cast('1999-01-02' as timestamp) and \"timestamp\" > cast"
-                    + "('1997-01-01' as timestamp)" + " group by "
-                    + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(druidChecker
-      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-              + "\"dimension\":\"__time\",\"outputName\":\"year\","
-              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
-              + "\"timeZone\":\"UTC\"}}"
-      )
-    ).returnsUnordered("year=1997; product_id=1016");
-  }
-
-  @Test public void testPushAggragateOnTimeWithExtractMonth() {
-    String sql =
-            "select EXTRACT( month from \"timestamp\") as \"month\",\"product_id\"  from "
-                    + "\"foodmart\" where \"product_id\" = 1016 and "
-                    + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
-                    + "('1997-01-01' as timestamp)" + " group by "
-                    + " EXTRACT( month from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(druidChecker
-      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-              + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
-              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
-              + "\"timeZone\":\"UTC\"}}"
-      )
-    ).returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
-            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016"
-    );
-  }
-
-  @Test public void testPushAggragateOnTimeWithExtractDay() {
-    String sql =
-            "select EXTRACT( day from \"timestamp\") as \"day\",\"product_id\"  from \"foodmart\""
-                    + " where \"product_id\" = 1016 and "
-                    + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
-                    + "('1997-01-01' as timestamp)" + " group by "
-                    + " EXTRACT( day from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(druidChecker
-      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-              + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
-              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
-              + "\"timeZone\":\"UTC\"}}"
-      )
-    ).returnsUnordered("day=02; product_id=1016", "day=10; product_id=1016",
-            "day=13; product_id=1016", "day=16; product_id=1016"
-    );
-  }
-
-  //Calcite rewrite this push as
+  @Test public void testPushAggregateOnTimeWithExtractYear() {
+    String sql = "select EXTRACT( year from \"timestamp\") as \"year\",\"product_id\" from "
+        + "\"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1999-01-02' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC'}}"))
+        .returnsUnordered("year=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractMonth() {
+    String sql = "select EXTRACT( month from \"timestamp\") as \"month\",\"product_id\" from "
+        + "\"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( month from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'MM',"
+                    + "'timeZone':'UTC'}}"))
+        .returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
+            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractDay() {
+    String sql = "select EXTRACT( day from \"timestamp\") as \"day\","
+        + "\"product_id\" from \"foodmart\""
+        + " where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( day from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'dd',"
+                    + "'timeZone':'UTC'}}"))
+        .returnsUnordered("day=02; product_id=1016", "day=10; product_id=1016",
+            "day=13; product_id=1016", "day=16; product_id=1016");
+  }
+
+  // Calcite rewrite the extract function in the query as:
   // rel#85:BindableProject.BINDABLE.[](input=rel#69:Subset#1.BINDABLE.[],
-  // hourOfDay=/INT(MOD(Reinterpret($0), 86400000), 3600000),product_id=$1)
-  // hence not sure if that is valid for hive as well.
-  @Ignore @Test public void testPushAggragateOnTimeWithExtractHourOfDay() {
+  // hourOfDay=/INT(MOD(Reinterpret($0), 86400000), 3600000),product_id=$1).
+  // Currently 'EXTRACT( hour from \"timestamp\")' is not pushed to Druid.
+  @Ignore @Test public void testPushAggregateOnTimeWithExtractHourOfDay() {
     String sql =
-            "select EXTRACT( hour from \"timestamp\") as \"hourOfDay\",\"product_id\"  from "
-                    + "\"foodmart\" where \"product_id\" = 1016 and "
-                    + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
-                    + "('1997-01-01' as timestamp)" + " group by "
-                    + " EXTRACT( hour from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(druidChecker
-      (",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-              + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
-              + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
-              + "\"timeZone\":\"UTC\"}}"
-      )
-    ).returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
-            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016"
-    );
-  }
-
-  @Test public void testPushAggragateOnTimeWithExtractYearMonthDay() {
+        "select EXTRACT( hour from \"timestamp\") as \"hourOfDay\",\"product_id\"  from "
+            + "\"foodmart\" where \"product_id\" = 1016 and "
+            + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+            + "('1997-01-01' as timestamp)" + " group by "
+            + " EXTRACT( hour from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'H',"
+                    + "'timeZone':'UTC'}}"))
+        .returnsUnordered("month=01; product_id=1016", "month=02; product_id=1016",
+            "month=03; product_id=1016", "month=04; product_id=1016", "month=05; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractYearMonthDay() {
     String sql = "select EXTRACT( day from \"timestamp\") as \"day\", EXTRACT( month from "
-            + "\"timestamp\") as \"month\",  EXTRACT( year from \"timestamp\") as \"year\",\""
-            + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
-            + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
-            + "('1997-01-01' as timestamp)"
-            + " group by "
-            + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
-            + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(
-            druidChecker(",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
-                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
-                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"year\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
-                    + "\"timeZone\":\"UTC\"}}")).explainContains("PLAN=EnumerableInterpreter\n"
+        + "\"timestamp\") as \"month\",  EXTRACT( year from \"timestamp\") as \"year\",\""
+        + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'dd',"
+                    + "'timeZone':'UTC'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_1',"
+                    + "'extractionFn':{'type':'timeFormat','format':'MM',"
+                    + "'timeZone':'UTC'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_2',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
             + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
             + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
             + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
-            .returnsUnordered("day=02; month=01; year=1997; product_id=1016",
-                    "day=10; month=01; year=1997; product_id=1016",
-                    "day=13; month=01; year=1997; product_id=1016",
-                    "day=16; month=01; year=1997; product_id=1016"
-    );
+        .returnsUnordered("day=02; month=01; year=1997; product_id=1016",
+            "day=10; month=01; year=1997; product_id=1016",
+            "day=13; month=01; year=1997; product_id=1016",
+            "day=16; month=01; year=1997; product_id=1016");
   }
 
-  @Test public void testPushAggragateOnTimeWithExtractYearMonthDayWithOutRenaming() {
+  @Test public void testPushAggregateOnTimeWithExtractYearMonthDayWithOutRenaming() {
     String sql = "select EXTRACT( day from \"timestamp\"), EXTRACT( month from "
-            + "\"timestamp\"),  EXTRACT( year from \"timestamp\"),\""
-            + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
-            + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
-            + "('1997-01-01' as timestamp)"
-            + " group by "
-            + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
-            + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
-
-    sql(sql).queryContains(
-            druidChecker(",\"granularity\":\"all\"", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"dayOfMonth\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"dd\","
-                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"monthOfYear\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"MM\","
-                    + "\"timeZone\":\"UTC\"}}", "{\"type\":\"extraction\","
-                    + "\"dimension\":\"__time\",\"outputName\":\"year\","
-                    + "\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy\","
-                    + "\"timeZone\":\"UTC\"}}")).explainContains("PLAN=EnumerableInterpreter\n"
+        + "\"timestamp\"), EXTRACT( year from \"timestamp\"),\""
+        + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'dd',"
+                    + "'timeZone':'UTC'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_1',"
+                    + "'extractionFn':{'type':'timeFormat','format':'MM',"
+                    + "'timeZone':'UTC'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_2',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
             + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
             + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
             + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
-            .returnsUnordered("EXPR$0=02; EXPR$1=01; EXPR$2=1997; product_id=1016",
-                    "EXPR$0=10; EXPR$1=01; EXPR$2=1997; product_id=1016",
-                    "EXPR$0=13; EXPR$1=01; EXPR$2=1997; product_id=1016",
-                    "EXPR$0=16; EXPR$1=01; EXPR$2=1997; product_id=1016"
-    );
+        .returnsUnordered("EXPR$0=02; EXPR$1=01; EXPR$2=1997; product_id=1016",
+            "EXPR$0=10; EXPR$1=01; EXPR$2=1997; product_id=1016",
+            "EXPR$0=13; EXPR$1=01; EXPR$2=1997; product_id=1016",
+            "EXPR$0=16; EXPR$1=01; EXPR$2=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractWithOutRenaming() {
+    String sql = "select EXTRACT( day from \"timestamp\"), "
+        + "\"product_id\" as \"dayOfMonth\" from \"foodmart\" "
+        + "where \"product_id\" = 1016 and \"timestamp\" < cast('1997-01-20' as timestamp) "
+        + "and \"timestamp\" > cast('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( day from \"timestamp\"),"
+        + " \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'extractionFn':{'type':'timeFormat','format':'dd',"
+                    + "'timeZone':'UTC'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)], "
+            + "projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), $1]], "
+            + "groups=[{0, 1}], aggs=[[]])\n")
+        .returnsUnordered("EXPR$0=02; dayOfMonth=1016", "EXPR$0=10; dayOfMonth=1016",
+            "EXPR$0=13; dayOfMonth=1016", "EXPR$0=16; dayOfMonth=1016");
   }
 }