You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/04/19 17:35:10 UTC

calcite git commit: [CALCITE-1758] Push to Druid OrderBy/Limit operation over time dimension and additional columns (Slim Bouguerra)

Repository: calcite
Updated Branches:
  refs/heads/master a0a4f3715 -> cd136985a


[CALCITE-1758] Push to Druid OrderBy/Limit operation over time dimension and additional columns (Slim Bouguerra)

Close apache/calcite#433


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/cd136985
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/cd136985
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/cd136985

Branch: refs/heads/master
Commit: cd136985af5b9d0b74751bd65d9669996363786e
Parents: a0a4f37
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Mon Apr 17 10:36:28 2017 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Apr 19 18:34:38 2017 +0100

----------------------------------------------------------------------
 .../calcite/adapter/druid/DruidQuery.java       |  85 ++--
 .../calcite/adapter/druid/DruidRules.java       |  64 +--
 .../adapter/druid/ExtractionFunctionUtil.java   |   6 +-
 .../druid/TimeExtractionDimensionSpec.java      |  25 +-
 .../adapter/druid/TimeExtractionFunction.java   |  18 +-
 .../org/apache/calcite/test/DruidAdapterIT.java | 399 +++++++++++++------
 6 files changed, 400 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 05567cc..fe440cc 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -60,6 +60,7 @@ import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
 import org.apache.calcite.util.Pair;
@@ -71,6 +72,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -78,6 +80,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.calcite.sql.SqlKind.INPUT_REF;
@@ -100,6 +103,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
   private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
   private static final String EXTRACT_COLUMN_NAME_PREFIX = "extract";
+  private static final String FLOOR_COLUMN_NAME_PREFIX = "floor";
   protected static final String DRUID_QUERY_FETCH = "druid.query.fetch";
 
   /**
@@ -377,7 +381,9 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         // A plan where all extra columns are pruned will be preferred.
         .multiplyBy(
             RelMdUtil.linear(querySpec.fieldNames.size(), 2, 100, 1d, 2d))
-        .multiplyBy(getQueryTypeCostMultiplier());
+        .multiplyBy(getQueryTypeCostMultiplier())
+        // a plan with sort pushed to druid is better than doing sort outside of druid
+        .multiplyBy(Util.last(rels) instanceof Sort ? 0.1 : 1.0);
   }
 
   private double getQueryTypeCostMultiplier() {
@@ -491,6 +497,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     QueryType queryType = QueryType.SELECT;
     final Translator translator = new Translator(druidTable, rowType);
     List<String> fieldNames = rowType.getFieldNames();
+    Set<String> usedFieldNames = Sets.newHashSet(fieldNames);
 
     // Handle filter
     Json jsonFilter = null;
@@ -515,7 +522,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     // executed as a Timeseries, TopN, or GroupBy in Druid
     final List<DimensionSpec> dimensions = new ArrayList<>();
     final List<JsonAggregation> aggregations = new ArrayList<>();
-    Granularity granularity = Granularity.ALL;
+    Granularity finalGranularity = Granularity.ALL;
     Direction timeSeriesDirection = null;
     JsonLimit limit = null;
     TimeExtractionDimensionSpec timeExtractionDimensionSpec = null;
@@ -525,24 +532,20 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       assert aggCalls.size() == aggNames.size();
 
       int timePositionIdx = -1;
-      int extractNumber = -1;
       final ImmutableList.Builder<String> builder = ImmutableList.builder();
       if (projects != null) {
         for (int groupKey : groupSet) {
-          final String s = fieldNames.get(groupKey);
+          final String fieldName = fieldNames.get(groupKey);
           final RexNode project = projects.get(groupKey);
           if (project instanceof RexInputRef) {
             // Reference could be to the timestamp or druid dimension but no druid metric
             final RexInputRef ref = (RexInputRef) project;
-            final String origin = druidTable.getRowType(getCluster().getTypeFactory())
+            final String originalFieldName = druidTable.getRowType(getCluster().getTypeFactory())
                 .getFieldList().get(ref.getIndex()).getName();
-            if (origin.equals(druidTable.timestampFieldName)) {
-              granularity = Granularity.ALL;
-              // Generate unique name as timestampFieldName is taken
-              String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-              while (fieldNames.contains(extractColumnName)) {
-                extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-              }
+            if (originalFieldName.equals(druidTable.timestampFieldName)) {
+              finalGranularity = Granularity.ALL;
+              String extractColumnName = SqlValidatorUtil.uniquify(EXTRACT_COLUMN_NAME_PREFIX,
+                  usedFieldNames, SqlValidatorUtil.EXPR_SUGGESTER);
               timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
                   extractColumnName);
               dimensions.add(timeExtractionDimensionSpec);
@@ -550,38 +553,46 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
               assert timePositionIdx == -1;
               timePositionIdx = groupKey;
             } else {
-              dimensions.add(new DefaultDimensionSpec(s));
-              builder.add(s);
+              dimensions.add(new DefaultDimensionSpec(fieldName));
+              builder.add(fieldName);
             }
           } else if (project instanceof RexCall) {
             // Call, check if we should infer granularity
             final RexCall call = (RexCall) project;
-            final Granularity funcGranularity =
-                DruidDateTimeUtils.extractGranularity(call);
+            final Granularity funcGranularity = DruidDateTimeUtils.extractGranularity(call);
             if (funcGranularity != null) {
               if (call.getKind().equals(SqlKind.EXTRACT)) {
-                // case extract on time
-                granularity = Granularity.ALL;
-                // Generate unique name as timestampFieldName is taken
-                String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-                while (fieldNames.contains(extractColumnName)) {
-                  extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-                }
+                // case extract field from time column
+                finalGranularity = Granularity.ALL;
+                String extractColumnName = SqlValidatorUtil.uniquify(EXTRACT_COLUMN_NAME_PREFIX
+                    + "_" + funcGranularity.value, usedFieldNames, SqlValidatorUtil.EXPR_SUGGESTER);
                 timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeExtract(
                     funcGranularity, extractColumnName);
                 dimensions.add(timeExtractionDimensionSpec);
                 builder.add(extractColumnName);
               } else {
-                // case floor by granularity
-                granularity = funcGranularity;
-                builder.add(s);
+                // case floor time column
+                if (groupSet.cardinality() > 1) {
+                  // case we have more than 1 group by key -> then will have druid group by
+                  String extractColumnName = SqlValidatorUtil.uniquify(FLOOR_COLUMN_NAME_PREFIX
+                      + "_" + funcGranularity.value, usedFieldNames, SqlValidatorUtil
+                      .EXPR_SUGGESTER);
+                  dimensions.add(
+                      TimeExtractionDimensionSpec.makeFloor(funcGranularity, extractColumnName));
+                  finalGranularity = Granularity.ALL;
+                  builder.add(extractColumnName);
+                } else {
+                  // case timeseries we can not use extraction function
+                  finalGranularity = funcGranularity;
+                  builder.add(fieldName);
+                }
                 assert timePositionIdx == -1;
                 timePositionIdx = groupKey;
               }
 
             } else {
-              dimensions.add(new DefaultDimensionSpec(s));
-              builder.add(s);
+              dimensions.add(new DefaultDimensionSpec(fieldName));
+              builder.add(fieldName);
             }
           } else {
             throw new AssertionError("incompatible project expression: " + project);
@@ -591,12 +602,10 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         for (int groupKey : groupSet) {
           final String s = fieldNames.get(groupKey);
           if (s.equals(druidTable.timestampFieldName)) {
-            granularity = Granularity.ALL;
+            finalGranularity = Granularity.ALL;
             // Generate unique name as timestampFieldName is taken
-            String extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-            while (fieldNames.contains(extractColumnName)) {
-              extractColumnName = EXTRACT_COLUMN_NAME_PREFIX + "_" + (++extractNumber);
-            }
+            String extractColumnName = SqlValidatorUtil.uniquify(EXTRACT_COLUMN_NAME_PREFIX,
+                usedFieldNames, SqlValidatorUtil.EXPR_SUGGESTER);
             timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
                 extractColumnName);
             dimensions.add(timeExtractionDimensionSpec);
@@ -645,7 +654,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         queryType = QueryType.TIMESERIES;
         assert fetch == null;
       } else if (dimensions.size() == 1
-          && granularity == Granularity.ALL
+          && finalGranularity == Granularity.ALL
           && sortsMetric
           && collations.size() == 1
           && fetch != null
@@ -680,7 +689,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         generator.writeStringField("dataSource", druidTable.dataSource);
         generator.writeBooleanField("descending", timeSeriesDirection != null
             && timeSeriesDirection == Direction.DESCENDING);
-        generator.writeStringField("granularity", granularity.value);
+        generator.writeStringField("granularity", finalGranularity.value);
         writeFieldIf(generator, "filter", jsonFilter);
         writeField(generator, "aggregations", aggregations);
         writeFieldIf(generator, "postAggregations", null);
@@ -700,7 +709,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
         generator.writeStringField("queryType", "topN");
         generator.writeStringField("dataSource", druidTable.dataSource);
-        generator.writeStringField("granularity", granularity.value);
+        generator.writeStringField("granularity", finalGranularity.value);
         writeField(generator, "dimension", dimensions.get(0));
         generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0)));
         writeFieldIf(generator, "filter", jsonFilter);
@@ -716,7 +725,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         generator.writeStartObject();
         generator.writeStringField("queryType", "groupBy");
         generator.writeStringField("dataSource", druidTable.dataSource);
-        generator.writeStringField("granularity", granularity.value);
+        generator.writeStringField("granularity", finalGranularity.value);
         writeField(generator, "dimensions", dimensions);
         writeFieldIf(generator, "limitSpec", limit);
         writeFieldIf(generator, "filter", jsonFilter);
@@ -738,7 +747,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         writeFieldIf(generator, "filter", jsonFilter);
         writeField(generator, "dimensions", translator.dimensions);
         writeField(generator, "metrics", translator.metrics);
-        generator.writeStringField("granularity", granularity.value);
+        generator.writeStringField("granularity", finalGranularity.value);
 
         generator.writeFieldName("pagingSpec");
         generator.writeStartObject();

http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 309d1f2..8a02fd9 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -565,10 +565,9 @@ public class DruidRules {
         return;
       }
       // Either it is:
+      // - a sort and limit on a dimension/metric part of the druid group by query or
       // - a sort without limit on the time column on top of
       //     Agg operator (transformable to timeseries query), or
-      // - it is a sort w/o limit on columns that do not include
-      //     the time column on top of Agg operator, or
       // - a simple limit on top of other operator than Agg
       if (!validSortLimit(sort, query)) {
         return;
@@ -587,35 +586,21 @@ public class DruidRules {
       if (query.getTopNode() instanceof Aggregate) {
         final Aggregate topAgg = (Aggregate) query.getTopNode();
         final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder();
-        int metricsRefs = 0;
         for (RelFieldCollation col : sort.collation.getFieldCollations()) {
           int idx = col.getFieldIndex();
           if (idx >= topAgg.getGroupCount()) {
-            metricsRefs++;
             continue;
           }
+          //has the indexes of the columns used for sorts
           positionsReferenced.set(topAgg.getGroupSet().nth(idx));
         }
-        boolean refsTimestamp =
-                checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput(), query);
-        if (refsTimestamp && metricsRefs != 0) {
-          // Metrics reference timestamp too
-          return false;
-        }
-        // If the aggregate is grouping by timestamp (or a function of the
-        // timestamp such as month) then we cannot push Sort to Druid.
-        // Druid's topN and groupBy operators would sort only within the
-        // granularity, whereas we want global sort.
-        final boolean aggregateRefsTimestamp =
-            checkTimestampRefOnQuery(topAgg.getGroupSet(), topAgg.getInput(), query);
-        if (aggregateRefsTimestamp && metricsRefs != 0) {
-          return false;
-        }
-        if (refsTimestamp
-            && sort.collation.getFieldCollations().size() == 1
+        // Case it is a timeseries query
+        if (checkIsFlooringTimestampRefOnQuery(topAgg.getGroupSet(), topAgg.getInput(), query)
             && topAgg.getGroupCount() == 1) {
-          // Timeseries query: if it has a limit, we cannot push
-          return !RelOptUtil.isLimit(sort);
+          // do not push if it has a limit or more than one sort key or we have sort by
+          // metric/dimension
+          return !RelOptUtil.isLimit(sort) && sort.collation.getFieldCollations().size() == 1
+              && checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput(), query);
         }
         return true;
       }
@@ -625,6 +610,36 @@ public class DruidRules {
     }
   }
 
+  /** Returns true if any of the grouping key is a floor operator over the timestamp column. */
+  private static boolean checkIsFlooringTimestampRefOnQuery(ImmutableBitSet set, RelNode top,
+      DruidQuery query) {
+    if (top instanceof Project) {
+      ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
+      final Project project = (Project) top;
+      for (int index : set) {
+        RexNode node = project.getProjects().get(index);
+        if (node instanceof RexCall) {
+          RexCall call = (RexCall) node;
+          assert DruidDateTimeUtils.extractGranularity(call) != null;
+          if (call.getKind().equals(SqlKind.FLOOR)) {
+            newSet.addAll(RelOptUtil.InputFinder.bits(call));
+          }
+        }
+      }
+      top = project.getInput();
+      set = newSet.build();
+    }
+    // Check if any references the timestamp column
+    for (int index : set) {
+      if (query.druidTable.timestampFieldName.equals(
+          top.getRowType().getFieldNames().get(index))) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   /** Checks whether any of the references leads to the timestamp column. */
   private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top,
       DruidQuery query) {
@@ -638,7 +653,8 @@ public class DruidRules {
         } else if (node instanceof RexCall) {
           RexCall call = (RexCall) node;
           assert DruidDateTimeUtils.extractGranularity(call) != null;
-          newSet.set(((RexInputRef) call.getOperands().get(0)).getIndex());
+          // when we have extract from time columnthe rexCall is in the form of /Reinterpret$0
+          newSet.addAll(RelOptUtil.InputFinder.bits(call));
         }
       }
       top = project.getInput();

http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunctionUtil.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunctionUtil.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunctionUtil.java
index f3c71f3..b7cf372 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunctionUtil.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractionFunctionUtil.java
@@ -51,11 +51,11 @@ public final class ExtractionFunctionUtil {
         }
         switch (timeUnit) {
         case YEAR:
-          return TimeExtractionFunction.createFromGranularity(Granularity.YEAR);
+          return TimeExtractionFunction.createExtractFromGranularity(Granularity.YEAR);
         case MONTH:
-          return TimeExtractionFunction.createFromGranularity(Granularity.MONTH);
+          return TimeExtractionFunction.createExtractFromGranularity(Granularity.MONTH);
         case DAY:
-          return TimeExtractionFunction.createFromGranularity(Granularity.DAY);
+          return TimeExtractionFunction.createExtractFromGranularity(Granularity.DAY);
         default:
           return null;
         }

http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
index 5163ee9..8f38720 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
@@ -31,6 +31,7 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
    * to the given name.
    *
    * @param outputName name of the output column
+   *
    * @return the time extraction DimensionSpec instance
    */
   public static TimeExtractionDimensionSpec makeFullTimeExtract(String outputName) {
@@ -44,8 +45,9 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
    * name. Only YEAR, MONTH, and DAY granularity are supported.
    *
    * @param granularity granularity to apply to the column
-   * @param outputName name of the output column
-   * @return the time extraction DimensionSpec instance or null if granularity
+   * @param outputName  name of the output column
+   *
+   * @return time field extraction DimensionSpec instance or null if granularity
    * is not supported
    */
   public static TimeExtractionDimensionSpec makeExtract(
@@ -53,18 +55,31 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
     switch (granularity) {
     case YEAR:
       return new TimeExtractionDimensionSpec(
-          TimeExtractionFunction.createFromGranularity(granularity), outputName);
+          TimeExtractionFunction.createExtractFromGranularity(granularity), outputName);
     case MONTH:
       return new TimeExtractionDimensionSpec(
-          TimeExtractionFunction.createFromGranularity(granularity), outputName);
+          TimeExtractionFunction.createExtractFromGranularity(granularity), outputName);
     case DAY:
       return new TimeExtractionDimensionSpec(
-          TimeExtractionFunction.createFromGranularity(granularity), outputName);
+          TimeExtractionFunction.createExtractFromGranularity(granularity), outputName);
     // TODO: Support other granularities
     default:
       return null;
     }
   }
+
+
+  /**
+   * Creates floor time extraction dimension spec from Granularity with a given output name
+   * @param granularity granularity to apply to the time column
+   * @param outputName name of the output column
+   *
+   * @return floor time extraction DimensionSpec instance.
+   */
+  public static TimeExtractionDimensionSpec makeFloor(Granularity granularity, String outputName) {
+    ExtractionFunction fn = TimeExtractionFunction.createFloorFromGranularity(granularity);
+    return new TimeExtractionDimensionSpec(fn, outputName);
+  }
 }
 
 // End TimeExtractionDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
index d3fbb28..ff1f1cb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -34,6 +34,7 @@ import static org.apache.calcite.adapter.druid.DruidQuery.writeFieldIf;
  */
 public class TimeExtractionFunction implements ExtractionFunction {
 
+  private static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
   private final String format;
   private final String granularity;
   private final String timeZone;
@@ -62,17 +63,17 @@ public class TimeExtractionFunction implements ExtractionFunction {
    * @return the time extraction function
    */
   public static TimeExtractionFunction createDefault() {
-    return new TimeExtractionFunction("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", null, "UTC", null);
+    return new TimeExtractionFunction(ISO_TIME_FORMAT, null, "UTC", null);
   }
 
   /**
    * Creates the time format extraction function for the given granularity.
-   * Only YEAR, MONTH, DAY, and HOUR granularity are supported.
+   * Only YEAR, MONTH, and DAY granularity are supported.
    *
    * @param granularity granularity to apply to the column
    * @return the time extraction function or null if granularity is not supported
    */
-  public static TimeExtractionFunction createFromGranularity(Granularity granularity) {
+  public static TimeExtractionFunction createExtractFromGranularity(Granularity granularity) {
     switch (granularity) {
     case DAY:
       return new TimeExtractionFunction("d", null, "UTC", Locale.getDefault().toLanguageTag());
@@ -84,6 +85,17 @@ public class TimeExtractionFunction implements ExtractionFunction {
       throw new AssertionError("Extraction " + granularity.value + " is not valid");
     }
   }
+
+  /**
+   * Creates time format floor time extraction function using a given granularity.
+   *
+   * @param granularity granularity to apply to the column
+   * @return the time extraction function or null if granularity is not supported
+   */
+  public static TimeExtractionFunction createFloorFromGranularity(Granularity granularity) {
+    return new TimeExtractionFunction(ISO_TIME_FORMAT, granularity.value, "UTC", Locale
+        .getDefault().toLanguageTag());
+  }
 }
 
 // End TimeExtractionFunction.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/cd136985/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 185e3c6..cc4732a 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -246,23 +246,19 @@ public class DruidAdapterIT {
         + "from \"wikiticker\"\n"
         + "group by \"page\", floor(\"__time\" to DAY)\n"
         + "order by \"s\" desc";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[DESC])\n"
-        + "    BindableProject(s=[$2], page=[$0], day=[$1])\n"
-        + "      DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$17, FLOOR($0, FLAG(DAY)), $1]], groups=[{0, 1}], aggs=[[SUM($2)]])\n";
-    final String druidQuery = "{'queryType':'groupBy',"
-        + "'dataSource':'wikiticker','granularity':'day',"
-        + "'dimensions':[{'type':'default','dimension':'page'}],"
-        + "'limitSpec':{'type':'default'},"
-        + "'aggregations':[{'type':'longSum','name':'s','fieldName':'added'}],"
-        + "'intervals':['1900-01-01T00:00:00.000/3000-01-01T00:00:00.000']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(s=[$2], page=[$0], day=[$1])\n"
+        + "    DruidQuery(table=[[wiki, wikiticker]], "
+        + "intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$17, FLOOR"
+        + "($0, FLAG(DAY)), $1]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC])";
     sql(sql, WIKI_AUTO2)
         .limit(1)
         .returnsUnordered("s=199818; page=User:QuackGuru/Electronic cigarettes 1; "
             + "day=2015-09-12 00:00:00")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(
+            druidChecker("'queryType':'groupBy'", "'limitSpec':{'type':'default',"
+            + "'columns':[{'dimension':'s','direction':'descending'}]}"));
   }
 
   @Test public void testSkipEmptyBuckets() {
@@ -326,9 +322,6 @@ public class DruidAdapterIT {
         .queryContains(druidChecker(druidQuery));
   }
 
-  /** Test case for
-   * <a href="https://github.com/druid-io/druid/issues/3905">DRUID-3905</a>. */
-  @Ignore("[DRUID-3905]")
   @Test public void testFilterTimeDistinct() {
     final String sql = "select distinct \"__time\"\n"
         + "from \"wikiticker\"\n"
@@ -337,19 +330,17 @@ public class DruidAdapterIT {
         + "EnumerableInterpreter\n"
         + "  DruidQuery(table=[[wiki, wikiticker]], "
         + "intervals=[[1900-01-01T00:00:00.000/2015-10-12T00:00:00.000]], "
-        + "projects=[[$0]], groups=[{0}], aggs=[[]])\n";
-    final String druidQuery = "{'queryType':'select',"
-        + "'dataSource':'wikiticker','descending':false,"
-        + "'intervals':['1900-01-01T00:00:00.000/2015-10-12T00:00:00.000'],"
-        + "'dimensions':[],'metrics':[],'granularity':'all',"
-        + "'pagingSpec':{'threshold':16384,'fromNext':true},"
-        + "'context':{'druid.query.fetch':false}}";
+        + "groups=[{0}], aggs=[[]])\n";
+    final String subDruidQuery = "{'queryType':'groupBy','dataSource':'wikiticker',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract',"
+        + "'extractionFn':{'type':'timeFormat'";
     sql(sql, WIKI_AUTO2)
         .limit(2)
         .returnsUnordered("__time=2015-09-12 00:46:58",
             "__time=2015-09-12 00:47:00")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(subDruidQuery));
   }
 
   @Test public void testMetadataColumns() throws Exception {
@@ -489,11 +480,10 @@ public class DruidAdapterIT {
   }
 
   @Test public void testSort() {
-    // Note: We do not push down SORT yet
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], groups=[{0, 1}], aggs=[[]])";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], sort0=[1], sort1=[0], dir0=[ASC], dir1=[DESC])";
     final String sql = "select distinct \"gender\", \"state_province\"\n"
         + "from \"foodmart\" order by 2, 1 desc";
     sql(sql)
@@ -503,15 +493,25 @@ public class DruidAdapterIT {
             "gender=F; state_province=OR",
             "gender=M; state_province=WA",
             "gender=F; state_province=WA")
+        .queryContains(
+            druidChecker("{'queryType':'groupBy','dataSource':'foodmart',"
+            + "'granularity':'all','dimensions':[{'type':'default',"
+            + "'dimension':'gender'},{'type':'default',"
+            + "'dimension':'state_province'}],'limitSpec':{'type':'default',"
+            + "'columns':[{'dimension':'state_province','direction':'ascending'},"
+            + "{'dimension':'gender','direction':'descending'}]},"
+            + "'aggregations':[{'type':'longSum','name':'dummy_agg',"
+            + "'fieldName':'dummy_agg'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}"))
         .explainContains(explain);
   }
 
   @Test public void testSortLimit() {
-    // Note: We do not push down SORT-LIMIT into Druid "groupBy" query yet
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[2], fetch=[3])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], groups=[{0, 1}], aggs=[[]])";
+    final String explain = "PLAN=EnumerableLimit(offset=[2], fetch=[3])\n"
+        + "  EnumerableInterpreter\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], sort0=[1], sort1=[0], dir0=[ASC], dir1=[DESC])";
     final String sql = "select distinct \"gender\", \"state_province\"\n"
         + "from \"foodmart\"\n"
         + "order by 2, 1 desc offset 2 rows fetch next 3 rows only";
@@ -550,22 +550,24 @@ public class DruidAdapterIT {
   }
 
   @Test public void testDistinctLimit() {
-    // We do not yet push LIMIT into a Druid "groupBy" query.
     final String sql = "select distinct \"gender\", \"state_province\"\n"
         + "from \"foodmart\" fetch next 3 rows only";
     final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
         + "'granularity':'all','dimensions':[{'type':'default','dimension':'gender'},"
-        + "{'type':'default','dimension':'state_province'}],'limitSpec':{'type':'default'},"
+        + "{'type':'default','dimension':'state_province'}],'limitSpec':{'type':'default',"
+        + "'limit':3,'columns':[]},"
         + "'aggregations':[{'type':'longSum','name':'dummy_agg','fieldName':'dummy_agg'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
-    final String explain = "PLAN="
-        + "EnumerableLimit(fetch=[3])\n"
-        + "  EnumerableInterpreter\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], groups=[{0, 1}], aggs=[[]])";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], fetch=[3])";
     sql(sql)
         .runs()
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("gender=F; state_province=CA", "gender=F; state_province=OR",
+            "gender=F; state_province=WA");
   }
 
   /** Test case for
@@ -661,18 +663,19 @@ public class DruidAdapterIT {
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$2], dir0=[DESC], fetch=[30])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
-        + "projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
-        + "aggs=[[SUM($2)]])\n";
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
+        + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], "
+        + "fetch=[30])";
     sql(sql)
         .runs()
         .returnsStartingWith("brand_name=Ebony; D=1997-07-27 00:00:00; S=135",
             "brand_name=Tri-State; D=1997-05-09 00:00:00; S=120",
             "brand_name=Hermanos; D=1997-05-09 00:00:00; S=115")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(
+            druidChecker("'queryType':'groupBy'", "'granularity':'all'", "'limitSpec"
+            + "':{'type':'default','limit':30,'columns':[{'dimension':'S','direction':'descending'}]}"));
   }
 
   /** Test case for
@@ -689,24 +692,27 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
-        + "'limitSpec':{'type':'default'},"
+    final String druidQueryPart1 = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'brand_name'},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'floor_day','extractionFn':{'type':'timeFormat'";
+    final String druidQueryPart2 = "'granularity':'day',"
+        + "'timeZone':'UTC','locale':'en-US'}}],'limitSpec':{'type':'default',"
+        + "'limit':30,'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$2], dir0=[DESC], fetch=[30])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
-        + "projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
-        + "aggs=[[SUM($2)]])\n";
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
+        + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], "
+        + "fetch=[30])";
     sql(sql)
         .runs()
         .returnsStartingWith("brand_name=Ebony; D=1997-07-27 00:00:00; S=135",
             "brand_name=Tri-State; D=1997-05-09 00:00:00; S=120",
             "brand_name=Hermanos; D=1997-05-09 00:00:00; S=115")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQueryPart1, druidQueryPart2));
   }
 
   /** Test case for
@@ -718,24 +724,21 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by \"brand_name\"";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
-        + "'limitSpec':{'type':'default'},"
-        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
-        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
+    final String subDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'brand_name'},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'floor_day','extractionFn':{'type':'timeFormat'";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
-        + "projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
-        + "aggs=[[SUM($2)]])\n";
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
+        + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[0], dir0=[ASC])";
     sql(sql)
         .runs()
         .returnsStartingWith("brand_name=ADJ; D=1997-01-11 00:00:00; S=2",
             "brand_name=ADJ; D=1997-01-12 00:00:00; S=3",
             "brand_name=ADJ; D=1997-01-17 00:00:00; S=3")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(subDruidQuery));
   }
 
   /** Tests a query that contains no GROUP BY and is therefore executed as a
@@ -1027,8 +1030,9 @@ public class DruidAdapterIT {
         + "group by \"state_province\"\n"
         + "order by \"state_province\"";
     String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], groups=[{30}], aggs=[[COUNT()]])";
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], groups=[{30}], "
+        + "aggs=[[COUNT()]], sort0=[0], dir0=[ASC])";
     sql(sql)
         .limit(2)
         .returnsOrdered("state_province=CA; C=24441",
@@ -1096,13 +1100,11 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) ASC";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
-        + "projects=[[FLOOR($0, FLAG(MONTH)), $89, $71]], groups=[{0}], "
-        + "aggs=[[SUM($1), COUNT($2)]])";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[0], "
+        + "dir0=[ASC])";
     sql(sql)
         .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=7033",
             "M=1997-02-01 00:00:00; S=20957; C=6844",
@@ -1126,13 +1128,12 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) limit 3";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n"
+    final String explain = "PLAN=EnumerableLimit(fetch=[3])\n"
+        + "  EnumerableInterpreter\n"
         + "    DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
-        + "projects=[[FLOOR($0, FLAG(MONTH)), $89, $71]], groups=[{0}], "
-        + "aggs=[[SUM($1), COUNT($2)]])";
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[0], "
+        + "dir0=[ASC])";
     sql(sql)
         .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=7033",
             "M=1997-02-01 00:00:00; S=20957; C=6844",
@@ -1185,15 +1186,19 @@ public class DruidAdapterIT {
         + "order by s desc limit 3";
     // Cannot use a Druid "topN" query, granularity != "all";
     // have to use "groupBy" query followed by external Sort and fetch.
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[DESC], fetch=[3])\n"
-        + "    BindableProject(S=[$2], M=[$3], P=[$0])\n"
-        + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$30, FLOOR($0, FLAG(MONTH)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]])";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'month',"
-        + "'dimensions':[{'type':'default','dimension':'state_province'}],"
-        + "'limitSpec':{'type':'default'},"
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(S=[$2], M=[$3], P=[$0])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$30, FLOOR"
+        + "($0, FLAG(MONTH)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]], sort0=[2], "
+        + "dir0=[DESC], fetch=[3])";
+    final String druidQueryPart1 = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'state_province'},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'floor_month','extractionFn':{'type':'timeFormat','format'";
+    final String druidQueryPart2 = "'granularity':'month','timeZone':'UTC',"
+        + "'locale':'en-US'}}],'limitSpec':{'type':'default','limit':3,"
+        + "'columns':[{'dimension':'S','direction':'descending'}]},"
         + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
         + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
@@ -1202,7 +1207,7 @@ public class DruidAdapterIT {
             "S=12297; M=7; P=WA",
             "S=10640; M=6; P=WA")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQueryPart1, druidQueryPart2));
   }
 
   @Test public void testTopNDayGranularityFiltered() {
@@ -1214,20 +1219,16 @@ public class DruidAdapterIT {
         + " \"timestamp\" < '1997-09-01 00:00:00'\n"
         + "group by \"state_province\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 6";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[DESC], fetch=[6])\n"
-        + "    BindableProject(S=[$2], M=[$3], P=[$0])\n"
-        + "      DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1997-01-01T00:00:00.000/1997-09-01T00:00:00.000]], "
-        + "projects=[[$30, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
-        + "aggs=[[SUM($2), MAX($2)]]";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'day',"
-        + "'dimensions':[{'type':'default','dimension':'state_province'}],"
-        + "'limitSpec':{'type':'default'},"
-        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
-        + "{'type':'longMax','name':'M','fieldName':'unit_sales'}],"
-        + "'intervals':['1997-01-01T00:00:00.000/1997-09-01T00:00:00.000']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(S=[$2], M=[$3], P=[$0])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1997-01-01T00:00:00.000/1997-09-01T00:00:00.000]], projects=[[$30, FLOOR"
+        + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]], sort0=[2], "
+        + "dir0=[DESC], fetch=[6])";
+    final String druidQueryType = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions'";
+    final String limitSpec = "'limitSpec':{'type':'default','limit':6,"
+        + "'columns':[{'dimension':'S','direction':'descending'}]}";
     sql(sql)
         .returnsOrdered("S=2527; M=5; P=OR",
             "S=2525; M=6; P=OR",
@@ -1236,7 +1237,7 @@ public class DruidAdapterIT {
             "S=1691; M=5; P=OR",
             "S=1629; M=5; P=WA")
         .explainContains(explain)
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQueryType, limitSpec));
   }
 
   @Test public void testGroupByHaving() {
@@ -1515,7 +1516,7 @@ public class DruidAdapterIT {
         + "group by \"timestamp\", \"product_id\" ";
     String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
         + "'granularity':'all','dimensions':[{'type':'extraction',"
-        + "'dimension':'__time','outputName':'extract_0',"
+        + "'dimension':'__time','outputName':'extract',"
         + "'extractionFn':{'type':'timeFormat','format':'yyyy-MM-dd";
     sql(sql)
         .queryContains(druidChecker(druidQuery))
@@ -1533,7 +1534,7 @@ public class DruidAdapterIT {
             druidChecker(
                 ",'granularity':'all'",
                 "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_year',"
                     + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .returnsUnordered("year=1997; product_id=1016");
@@ -1550,7 +1551,7 @@ public class DruidAdapterIT {
             druidChecker(
                 ",'granularity':'all'",
                 "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_month',"
                     + "'extractionFn':{'type':'timeFormat','format':'M',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .returnsUnordered("month=1; product_id=1016", "month=2; product_id=1016",
@@ -1569,7 +1570,7 @@ public class DruidAdapterIT {
             druidChecker(
                 ",'granularity':'all'",
                 "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_day',"
                     + "'extractionFn':{'type':'timeFormat','format':'d',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .returnsUnordered("day=2; product_id=1016", "day=10; product_id=1016",
@@ -1613,13 +1614,13 @@ public class DruidAdapterIT {
             druidChecker(
                 ",'granularity':'all'",
                 "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_day',"
                     + "'extractionFn':{'type':'timeFormat','format':'d',"
                     + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_1',"
+                    + "'dimension':'__time','outputName':'extract_month',"
                     + "'extractionFn':{'type':'timeFormat','format':'M',"
                     + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_2',"
+                    + "'dimension':'__time','outputName':'extract_year',"
                     + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .explainContains("PLAN=EnumerableInterpreter\n"
@@ -1647,13 +1648,13 @@ public class DruidAdapterIT {
         .queryContains(
             druidChecker(
                 ",'granularity':'all'", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_day',"
                     + "'extractionFn':{'type':'timeFormat','format':'d',"
                     + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_1',"
+                    + "'dimension':'__time','outputName':'extract_month',"
                     + "'extractionFn':{'type':'timeFormat','format':'M',"
                     + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_2',"
+                    + "'dimension':'__time','outputName':'extract_year',"
                     + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .explainContains("PLAN=EnumerableInterpreter\n"
@@ -1680,7 +1681,7 @@ public class DruidAdapterIT {
         .queryContains(
             druidChecker(
                 ",'granularity':'all'", "{'type':'extraction',"
-                    + "'dimension':'__time','outputName':'extract_0',"
+                    + "'dimension':'__time','outputName':'extract_day',"
                     + "'extractionFn':{'type':'timeFormat','format':'d',"
                     + "'timeZone':'UTC','locale':'en-US'}}"))
         .explainContains("PLAN=EnumerableInterpreter\n"
@@ -1732,9 +1733,9 @@ public class DruidAdapterIT {
             druidChecker("{'queryType':'groupBy','dataSource':'foodmart',"
                 + "'granularity':'all','dimensions':[{'type':'default',"
                 + "'dimension':'product_id'},{'type':'extraction','dimension':'__time',"
-                + "'outputName':'extract_0','extractionFn':{'type':'timeFormat',"
+                + "'outputName':'extract_day','extractionFn':{'type':'timeFormat',"
                 + "'format':'d','timeZone':'UTC','locale':'en-US'}},{'type':'extraction',"
-                + "'dimension':'__time','outputName':'extract_1',"
+                + "'dimension':'__time','outputName':'extract_month',"
                 + "'extractionFn':{'type':'timeFormat','format':'M','timeZone':'UTC',"
                 + "'locale':'en-US'}}],'limitSpec':{'type':'default'},"
                 + "'filter':{'type':'and','fields':[{'type':'bound',"
@@ -1763,12 +1764,12 @@ public class DruidAdapterIT {
             druidChecker("{'queryType':'groupBy','dataSource':'foodmart',"
                 + "'granularity':'all','dimensions':[{'type':'default',"
                 + "'dimension':'product_id'},{'type':'extraction','dimension':'__time',"
-                + "'outputName':'extract_0','extractionFn':{'type':'timeFormat',"
+                + "'outputName':'extract_day','extractionFn':{'type':'timeFormat',"
                 + "'format':'d','timeZone':'UTC','locale':'en-US'}},{'type':'extraction',"
-                + "'dimension':'__time','outputName':'extract_1',"
+                + "'dimension':'__time','outputName':'extract_month',"
                 + "'extractionFn':{'type':'timeFormat','format':'M','timeZone':'UTC',"
                 + "'locale':'en-US'}},{'type':'extraction','dimension':'__time',"
-                + "'outputName':'extract_2','extractionFn':{'type':'timeFormat',"
+                + "'outputName':'extract_year','extractionFn':{'type':'timeFormat',"
                 + "'format':'yyyy','timeZone':'UTC','locale':'en-US'}}],"
                 + "'limitSpec':{'type':'default'},'filter':{'type':'and',"
                 + "'fields':[{'type':'bound','dimension':'product_id','lower':'1549',"
@@ -1794,7 +1795,7 @@ public class DruidAdapterIT {
     String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
         + "'granularity':'all','dimensions':[{'type':'default',"
         + "'dimension':'product_id'},{'type':'extraction','dimension':'__time',"
-        + "'outputName':'extract_0','extractionFn':{'type':'timeFormat',"
+        + "'outputName':'extract_month','extractionFn':{'type':'timeFormat',"
         + "'format':'M','timeZone':'UTC','locale':'en-US'}}],"
         + "'limitSpec':{'type':'default'},'filter':{'type':'and',"
         + "'fields':[{'type':'bound','dimension':'product_id','lower':'1558',"
@@ -1822,7 +1823,7 @@ public class DruidAdapterIT {
             druidChecker("{'queryType':'groupBy',"
                 + "'dataSource':'foodmart','granularity':'all',"
                 + "'dimensions':[{'type':'default','dimension':'product_id'},"
-                + "{'type':'extraction','dimension':'__time','outputName':'extract_0',"
+                + "{'type':'extraction','dimension':'__time','outputName':'extract_month',"
                 + "'extractionFn':{'type':'timeFormat','format':'M','timeZone':'UTC',"
                 + "'locale':'en-US'}}],'limitSpec':{'type':'default'},"
                 + "'filter':{'type':'and','fields':[{'type':'bound',"
@@ -1838,6 +1839,156 @@ public class DruidAdapterIT {
         .returnsUnordered("product_id=1558; EXPR$1=10", "product_id=1558; EXPR$1=11",
             "product_id=1559; EXPR$1=11");
   }
+
+  @Test public void testPushofOrderByWithMonthExtract() {
+    String sqlQuery = "SELECT  extract(month from \"timestamp\") as m , \"product_id\", SUM"
+        + "(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY extract(month from \"timestamp\"), \"product_id\" order by m, s, "
+        + "\"product_id\"";
+    sql(sqlQuery).queryContains(
+        druidChecker("{'queryType':'groupBy',"
+        + "'dataSource':'foodmart','granularity':'all',"
+        + "'dimensions':[{'type':'extraction','dimension':'__time',"
+        + "'outputName':'extract_month','extractionFn':{'type':'timeFormat',"
+        + "'format':'M','timeZone':'UTC','locale':'en-US'}},{'type':'default',"
+        + "'dimension':'product_id'}],'limitSpec':{'type':'default',"
+        + "'columns':[{'dimension':'extract_month','direction':'ascending'},"
+        + "{'dimension':'S','direction':'ascending'},{'dimension':'product_id',"
+        + "'direction':'ascending'}]},'filter':{'type':'bound',"
+        + "'dimension':'product_id','lower':'1558','lowerStrict':false,"
+        + "'ordering':'numeric'},'aggregations':[{'type':'longSum','name':'S',"
+        + "'fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
+            + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), "
+            + "86400000)), $1, $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[0], sort1=[2], "
+            + "sort2=[1], dir0=[ASC], dir1=[ASC], dir2=[ASC])");
+  }
+
+
+  @Test public void testGroupByFloorTimeWithoutLimit() {
+    final String sql = "select  floor(\"timestamp\" to MONTH) as \"month\"\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH)\n"
+        + "order by \"month\" DESC";
+    sql(sql)
+        .explainContains("PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH))]], groups=[{0}], aggs=[[]], sort0=[0], dir0=[DESC])")
+        .queryContains(druidChecker("'queryType':'timeseries'", "'descending':true"));
+  }
+
+  @Test public void testGroupByFloorTimeWithLimit() {
+    final String sql = "select  floor(\"timestamp\" to MONTH) as \"floor_month\"\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH)\n"
+        + "order by \"floor_month\" DESC LIMIT 3";
+    sql(sql).explainContains("PLAN=EnumerableLimit(fetch=[3])\n"
+        + "  EnumerableInterpreter\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH))]], groups=[{0}], aggs=[[]], sort0=[0], dir0=[DESC])")
+        .queryContains(druidChecker("'queryType':'timeseries'", "'descending':true"))
+        .returnsOrdered("floor_month=1997-12-01 00:00:00", "floor_month=1997-11-01 00:00:00",
+            "floor_month=1997-10-01 00:00:00");
+  }
+
+  @Test public void testPushofOrderByYearWithYearMonthExtract() {
+    String sqlQuery = "SELECT year(\"timestamp\") as y, extract(month from \"timestamp\") as m , "
+        + "\"product_id\", SUM"
+        + "(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY year(\"timestamp\"), extract(month from \"timestamp\"), \"product_id\" order"
+        + " by y DESC, m ASC, s DESC, \"product_id\" LIMIT 3";
+    final String expectedPlan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
+        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), 86400000)),"
+        + " EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), $1, $89]], groups=[{0, 1,"
+        + " 2}], aggs=[[SUM($3)]], sort0=[0], sort1=[1], sort2=[3], sort3=[2], dir0=[DESC], "
+        + "dir1=[ASC], dir2=[DESC], dir3=[ASC], fetch=[3])";
+    final String expectedDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract_year',"
+        + "'extractionFn':{'type':'timeFormat','format':'yyyy','timeZone':'UTC',"
+        + "'locale':'en-US'}},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'extract_month','extractionFn':{'type':'timeFormat',"
+        + "'format':'M','timeZone':'UTC','locale':'en-US'}},{'type':'default',"
+        + "'dimension':'product_id'}],'limitSpec':{'type':'default','limit':3,"
+        + "'columns':[{'dimension':'extract_year','direction':'descending'},"
+        + "{'dimension':'extract_month','direction':'ascending'},{'dimension':'S',"
+        + "'direction':'descending'},{'dimension':'product_id',"
+        + "'direction':'ascending'}]},'filter':{'type':'bound',"
+        + "'dimension':'product_id','lower':'1558','lowerStrict':false,"
+        + "'ordering':'numeric'},'aggregations':[{'type':'longSum','name':'S',"
+        + "'fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
+    sql(sqlQuery).explainContains(expectedPlan).queryContains(druidChecker(expectedDruidQuery))
+        .returnsOrdered("Y=1997; M=1; product_id=1558; S=6", "Y=1997; M=1; product_id=1559; S=6",
+            "Y=1997; M=10; product_id=1558; S=9");
+  }
+
+  @Test public void testPushofOrderByMetricWithYearMonthExtract() {
+    String sqlQuery = "SELECT year(\"timestamp\") as y, extract(month from \"timestamp\") as m , "
+        + "\"product_id\", SUM(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY year(\"timestamp\"), extract(month from \"timestamp\"), \"product_id\" order"
+        + " by s DESC, m DESC, \"product_id\" LIMIT 3";
+    final String expectedPlan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
+        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), 86400000)),"
+        + " EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), $1, $89]], groups=[{0, 1,"
+        + " 2}], aggs=[[SUM($3)]], sort0=[3], sort1=[1], sort2=[2], dir0=[DESC], dir1=[DESC], "
+        + "dir2=[ASC], fetch=[3])";
+    final String expectedDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract_year',"
+        + "'extractionFn':{'type':'timeFormat','format':'yyyy','timeZone':'UTC',"
+        + "'locale':'en-US'}},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'extract_month','extractionFn':{'type':'timeFormat',"
+        + "'format':'M','timeZone':'UTC','locale':'en-US'}},{'type':'default',"
+        + "'dimension':'product_id'}],'limitSpec':{'type':'default','limit':3,"
+        + "'columns':[{'dimension':'S','direction':'descending'},"
+        + "{'dimension':'extract_month','direction':'descending'},"
+        + "{'dimension':'product_id','direction':'ascending'}]},"
+        + "'filter':{'type':'bound','dimension':'product_id','lower':'1558',"
+        + "'lowerStrict':false,'ordering':'numeric'},"
+        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
+    sql(sqlQuery).explainContains(expectedPlan).queryContains(druidChecker(expectedDruidQuery))
+        .returnsOrdered("Y=1997; M=12; product_id=1558; S=30", "Y=1997; M=3; product_id=1558; S=29",
+            "Y=1997; M=5; product_id=1558; S=27");
+  }
+
+  @Test public void testGroupByTimeSortOverMetrics() {
+    final String sqlQuery = "SELECT count(*) as c , SUM(\"unit_sales\") as s, floor(\"timestamp\""
+        + " to month) FROM \"foodmart\" group by floor(\"timestamp\" to month) order by s DESC";
+    sql(sqlQuery)
+        .explainContains("PLAN=EnumerableInterpreter\n"
+        + "  BindableSort(sort0=[$1], dir0=[DESC])\n"
+        + "    BindableProject(C=[$1], S=[$2], EXPR$2=[$0])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89]], groups=[{0}], aggs=[[COUNT(), SUM($1)]])")
+        .queryContains(druidChecker("'queryType':'timeseries'"))
+        .returnsOrdered("C=8716; S=26796; EXPR$2=1997-12-01 00:00:00",
+        "C=8231; S=25270; EXPR$2=1997-11-01 00:00:00",
+        "C=7752; S=23763; EXPR$2=1997-07-01 00:00:00",
+        "C=7710; S=23706; EXPR$2=1997-03-01 00:00:00",
+        "C=7038; S=21697; EXPR$2=1997-08-01 00:00:00",
+        "C=7033; S=21628; EXPR$2=1997-01-01 00:00:00",
+        "C=6912; S=21350; EXPR$2=1997-06-01 00:00:00",
+        "C=6865; S=21081; EXPR$2=1997-05-01 00:00:00",
+        "C=6844; S=20957; EXPR$2=1997-02-01 00:00:00",
+        "C=6662; S=20388; EXPR$2=1997-09-01 00:00:00",
+        "C=6588; S=20179; EXPR$2=1997-04-01 00:00:00",
+        "C=6478; S=19958; EXPR$2=1997-10-01 00:00:00");
+  }
 }
 
 // End DruidAdapterIT.java