You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2018/06/22 18:49:12 UTC

[1/3] calcite git commit: [CALCITE-2286] Support timestamp type for Druid adapter

Repository: calcite
Updated Branches:
  refs/heads/master c12cb4b0d -> b29397d92


http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java b/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
index 1b20b32..9caa413 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
@@ -143,7 +143,7 @@ public class DruidDateRangeRulesTest {
       Matcher<String> intervalMatcher) {
     e = DateRangeRules.replaceTimeUnits(f.rexBuilder, e, "UTC");
     final List<Interval> intervals =
-        DruidDateTimeUtils.createInterval(e, "UTC");
+        DruidDateTimeUtils.createInterval(e);
     assertThat(intervals, notNullValue());
     assertThat(intervals.toString(), intervalMatcher);
   }
@@ -152,7 +152,7 @@ public class DruidDateRangeRulesTest {
     e = DateRangeRules.replaceTimeUnits(f.rexBuilder, e, "UTC");
     final RexNode e2 = f.simplify.simplify(e);
     List<Interval> intervals =
-        DruidDateTimeUtils.createInterval(e2, "UTC");
+        DruidDateTimeUtils.createInterval(e2);
     if (intervals == null) {
       throw new AssertionError("null interval");
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/test/resources/druid-foodmart-model-timestamp.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-foodmart-model-timestamp.json b/druid/src/test/resources/druid-foodmart-model-timestamp.json
new file mode 100644
index 0000000..ef61444
--- /dev/null
+++ b/druid/src/test/resources/druid-foodmart-model-timestamp.json
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "foodmart",
+  "schemas": [
+    {
+      "type": "custom",
+      "name": "foodmart",
+      "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
+      "operand": {
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
+      },
+      "tables": [
+        {
+          "name": "foodmart",
+          "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
+          "operand": {
+            "interval": "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z",
+            "timestampColumn": {
+              "name": "timestamp",
+              "type": "timestamp"
+            },
+            "dimensions": [
+              "product_id",
+              "brand_name",
+              "product_name",
+              "SKU",
+              "SRP",
+              "gross_weight",
+              "net_weight",
+              "recyclable_package",
+              "low_fat",
+              "units_per_case",
+              "cases_per_pallet",
+              "shelf_width",
+              "shelf_height",
+              "shelf_depth",
+              "product_class_id",
+              "product_subcategory",
+              "product_category",
+              "product_department",
+              "product_family",
+              "customer_id",
+              "account_num",
+              "lname",
+              "fname",
+              "mi",
+              "address1",
+              "address2",
+              "address3",
+              "address4",
+              "city",
+              "state_province",
+              "postal_code",
+              "country",
+              "customer_region_id",
+              "phone1",
+              "phone2",
+              "birthdate",
+              "marital_status",
+              "yearly_income",
+              "gender",
+              "total_children",
+              "num_children_at_home",
+              "education",
+              "date_accnt_opened",
+              "member_card",
+              "occupation",
+              "houseowner",
+              "num_cars_owned",
+              "fullname",
+              "promotion_id",
+              "promotion_district_id",
+              "promotion_name",
+              "media_type",
+              "cost",
+              "start_date",
+              "end_date",
+              "store_id",
+              "store_type",
+              "region_id",
+              "store_name",
+              "store_number",
+              "store_street_address",
+              "store_city",
+              "store_state",
+              "store_postal_code",
+              "store_country",
+              "store_manager",
+              "store_phone",
+              "store_fax",
+              "first_opened_date",
+              "last_remodel_date",
+              "store_sqft",
+              "grocery_sqft",
+              "frozen_sqft",
+              "meat_sqft",
+              "coffee_bar",
+              "video_store",
+              "salad_bar",
+              "prepared_food",
+              "florist",
+              "time_id",
+              "the_day",
+              "the_month",
+              "the_year",
+              "day_of_month",
+              "week_of_year",
+              "month_of_year",
+              "quarter",
+              "fiscal_period"
+            ],
+            "metrics": [
+              "unit_sales",
+              {
+                "name": "store_sales",
+                "type": "double"
+              },
+              {
+                "name": "store_cost",
+                "type": "double"
+              },
+              {
+                "name" : "customer_id_ts",
+                "type" : "thetaSketch",
+                "fieldName" : "customer_id"
+              }
+            ],
+            "complexMetrics" : [
+              "customer_id"
+            ]
+          }
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/test/resources/druid-foodmart-model.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-foodmart-model.json b/druid/src/test/resources/druid-foodmart-model.json
index dfca063..d5565d7 100644
--- a/druid/src/test/resources/druid-foodmart-model.json
+++ b/druid/src/test/resources/druid-foodmart-model.json
@@ -32,7 +32,10 @@
           "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
           "operand": {
             "interval": "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z",
-            "timestampColumn": "timestamp",
+            "timestampColumn": {
+              "name": "timestamp",
+              "type": "timestamp with local time zone"
+            },
             "dimensions": [
               "product_id",
               "brand_name",

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/site/_docs/druid_adapter.md
----------------------------------------------------------------------
diff --git a/site/_docs/druid_adapter.md b/site/_docs/druid_adapter.md
index c81687b..ff4216a 100644
--- a/site/_docs/druid_adapter.md
+++ b/site/_docs/druid_adapter.md
@@ -61,7 +61,10 @@ A basic example of a model file is given below:
           "operand": {
             "dataSource": "wikiticker",
             "interval": "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z",
-            "timestampColumn": "time",
+            "timestampColumn": {
+              "name": "time",
+              "type": "timestamp"
+            },
             "dimensions": [
               "channel",
               "cityName",


[2/3] calcite git commit: [CALCITE-2286] Support timestamp type for Druid adapter

Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT2.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT2.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT2.java
new file mode 100644
index 0000000..92e2322
--- /dev/null
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT2.java
@@ -0,0 +1,3979 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.adapter.druid.DruidQuery;
+import org.apache.calcite.adapter.druid.DruidSchema;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.druid} package.
+ *
+ * <p>Before calling this test, you need to populate Druid, as follows:
+ *
+ * <blockquote><code>
+ * git clone https://github.com/vlsi/calcite-test-dataset<br>
+ * cd calcite-test-dataset<br>
+ * mvn install
+ * </code></blockquote>
+ *
+ * <p>This will create a virtual machine with Druid and test data set.
+ *
+ * <p>Features not yet implemented:
+ * <ul>
+ *   <li>push LIMIT into "select" query</li>
+ *   <li>push SORT and/or LIMIT into "groupBy" query</li>
+ *   <li>push HAVING into "groupBy" query</li>
+ * </ul>
+ *
+ * These tests use "timestamp" type for the Druid timestamp column, instead
+ * of "timestamp with local time zone" type as {@link DruidAdapterIT}.
+ */
+public class DruidAdapterIT2 {
+  /** URL of the "druid-foodmart" model. */
+  public static final URL FOODMART =
+      DruidAdapterIT2.class.getResource("/druid-foodmart-model-timestamp.json");
+
+  /** Whether to run Druid tests. Enabled by default, however test is only
+   * included if "it" profile is activated ({@code -Pit}). To disable,
+   * specify {@code -Dcalcite.test.druid=false} on the Java command line. */
+  public static final boolean ENABLED =
+      Util.getBooleanProperty("calcite.test.druid", true);
+
+  private static final String VARCHAR_TYPE =
+      "VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"";
+
+  private static final String FOODMART_TABLE = "\"foodmart\"";
+
+  /** Whether to run this test. */
+  protected boolean enabled() {
+    return ENABLED;
+  }
+
+  /** Returns a function that checks that a particular Druid query is
+   * generated to implement a query. */
+  private static Function<List, Void> druidChecker(final String... lines) {
+    return new Function<List, Void>() {
+      public Void apply(List list) {
+        assertThat(list.size(), is(1));
+        DruidQuery.QuerySpec querySpec = (DruidQuery.QuerySpec) list.get(0);
+        for (String line : lines) {
+          final String s = line.replace('\'', '"');
+          assertThat(querySpec.getQueryString(null, -1), containsString(s));
+        }
+        return null;
+      }
+    };
+  }
+
+  /**
+   * Creates a query against FOODMART with approximate parameters
+   * */
+  private CalciteAssert.AssertQuery foodmartApprox(String sql) {
+    return approxQuery(FOODMART, sql);
+  }
+
+  private CalciteAssert.AssertQuery approxQuery(URL url, String sql) {
+    return CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", url.getPath()))
+        .with(CalciteConnectionProperty.APPROXIMATE_DISTINCT_COUNT.camelName(), true)
+        .with(CalciteConnectionProperty.APPROXIMATE_TOP_N.camelName(), true)
+        .with(CalciteConnectionProperty.APPROXIMATE_DECIMAL.camelName(), true)
+        .query(sql);
+  }
+
+  /** Creates a query against a data set given by a map. */
+  private CalciteAssert.AssertQuery sql(String sql, URL url) {
+    return CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", url.getPath()))
+        .query(sql);
+  }
+
+  /** Creates a query against the {@link #FOODMART} data set. */
+  private CalciteAssert.AssertQuery sql(String sql) {
+    return sql(sql, FOODMART);
+  }
+
+  @Test public void testMetadataColumns() throws Exception {
+    sql("values 1")
+        .withConnection(
+            new Function<Connection, Void>() {
+              public Void apply(Connection c) {
+                try {
+                  final DatabaseMetaData metaData = c.getMetaData();
+                  final ResultSet r =
+                      metaData.getColumns(null, null, "foodmart", null);
+                  Multimap<String, Boolean> map = ArrayListMultimap.create();
+                  while (r.next()) {
+                    map.put(r.getString("TYPE_NAME"), true);
+                  }
+                  System.out.println(map);
+                  // 1 timestamp, 2 float measure, 1 int measure, 88 dimensions
+                  assertThat(map.keySet().size(), is(4));
+                  assertThat(map.values().size(), is(92));
+                  assertThat(map.get("TIMESTAMP(0) NOT NULL").size(), is(1));
+                  assertThat(map.get("DOUBLE").size(), is(2));
+                  assertThat(map.get("BIGINT").size(), is(1));
+                  assertThat(map.get(VARCHAR_TYPE).size(), is(88));
+                } catch (SQLException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              }
+            });
+  }
+
+  @Test public void testSelectDistinct() {
+    final String explain = "PLAN="
+        + "EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{30}], aggs=[[]])";
+    final String sql = "select distinct \"state_province\" from \"foodmart\"";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
+        + "'dimensions':[{'type':'default','dimension':'state_province','outputName':'state_province'"
+        + ",'outputType':'STRING'}],'limitSpec':{'type':'default'},"
+        + "'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql)
+        .returnsUnordered("state_province=CA",
+            "state_province=OR",
+            "state_province=WA")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testSelectGroupBySum() {
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+        + "projects=[[$30, CAST($89):INTEGER]], groups=[{0}], aggs=[[SUM($1)]])";
+    final String sql = "select \"state_province\", sum(cast(\"unit_sales\" as integer)) as u\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\"";
+    sql(sql)
+        .returnsUnordered("state_province=CA; U=74748",
+            "state_province=OR; U=67659",
+            "state_province=WA; U=124366")
+        .explainContains(explain);
+  }
+
+  @Test public void testGroupbyMetric() {
+    final String sql = "select  \"store_sales\" ,\"product_id\" from \"foodmart\" "
+        + "where \"product_id\" = 1020" + "group by \"store_sales\" ,\"product_id\" ";
+    final String plan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=($1, 1020)],"
+        + " projects=[[$90, $1]], groups=[{0, 1}], aggs=[[]])";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
+        + "'dimensions':[{'type':'default','dimension':'store_sales',\"outputName\":\"store_sales\","
+        + "'outputType':'DOUBLE'},{'type':'default','dimension':'product_id','outputName':"
+        + "'product_id','outputType':'STRING'}],'limitSpec':{'type':'default'},"
+        + "'filter':{'type':'bound','dimension':'product_id','lower':'1020','lowerStrict':false,"
+        + "'upper':'1020','upperStrict':false,'ordering':'numeric'},'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql)
+        .explainContains(plan)
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("store_sales=0.51; product_id=1020",
+            "store_sales=1.02; product_id=1020",
+            "store_sales=1.53; product_id=1020",
+            "store_sales=2.04; product_id=1020",
+            "store_sales=2.55; product_id=1020");
+  }
+
+  @Test public void testPushSimpleGroupBy() {
+    final String sql = "select \"product_id\" from \"foodmart\" where "
+        + "\"product_id\" = 1020 group by \"product_id\"";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'product_id','outputName':'product_id','outputType':'STRING'}],"
+        + "'limitSpec':{'type':'default'},'filter':{'type':'bound','dimension':'product_id',"
+        + "'lower':'1020','lowerStrict':false,'upper':'1020','upperStrict':false,"
+        + "'ordering':'numeric'},'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql).returnsUnordered("product_id=1020").queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testComplexPushGroupBy() {
+    final String innerQuery = "select \"product_id\" as \"id\" from \"foodmart\" where "
+        + "\"product_id\" = 1020";
+    final String sql = "select \"id\" from (" + innerQuery + ") group by \"id\"";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all',"
+        + "'dimensions':[{'type':'default','dimension':'product_id','outputName':'product_id',"
+        + "'outputType':'STRING'}],'limitSpec':{'type':'default'},"
+        + "'filter':{'type':'bound','dimension':'product_id','lower':'1020','lowerStrict':false,"
+        + "'upper':'1020','upperStrict':false,'ordering':'numeric'},'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql)
+        .returnsUnordered("id=1020")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1281">[CALCITE-1281]
+   * Druid adapter wrongly returns all numeric values as int or float</a>. */
+  @Test public void testSelectCount() {
+    final String sql = "select count(*) as c from \"foodmart\"";
+    sql(sql)
+        .returns(new Function<ResultSet, Void>() {
+          public Void apply(ResultSet input) {
+            try {
+              assertThat(input.next(), is(true));
+              assertThat(input.getInt(1), is(86829));
+              assertThat(input.getLong(1), is(86829L));
+              assertThat(input.getString(1), is("86829"));
+              assertThat(input.wasNull(), is(false));
+              assertThat(input.next(), is(false));
+              return null;
+            } catch (SQLException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+  }
+
+  @Test public void testSort() {
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], sort0=[1], sort1=[0], dir0=[ASC], dir1=[DESC])";
+    final String sql = "select distinct \"gender\", \"state_province\"\n"
+        + "from \"foodmart\" order by 2, 1 desc";
+    sql(sql)
+        .returnsOrdered("gender=M; state_province=CA",
+            "gender=F; state_province=CA",
+            "gender=M; state_province=OR",
+            "gender=F; state_province=OR",
+            "gender=M; state_province=WA",
+            "gender=F; state_province=WA")
+        .queryContains(
+            druidChecker("{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
+                + "'dimensions':[{'type':'default','dimension':'gender','outputName':'gender',"
+                + "'outputType':'STRING'},{'type':'default','dimension':'state_province',"
+                + "'outputName':'state_province','outputType':'STRING'}],'limitSpec':"
+                + "{'type':'default','columns':[{'dimension':'state_province','direction':'ascending'"
+                + ",'dimensionOrder':'lexicographic'},{'dimension':'gender','direction':'descending',"
+                + "'dimensionOrder':'lexicographic'}]},'aggregations':[],"
+                + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"))
+        .explainContains(explain);
+  }
+
+  @Test public void testSortLimit() {
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[2], fetch=[3])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], sort0=[1], sort1=[0], dir0=[ASC], dir1=[DESC])";
+    final String sql = "select distinct \"gender\", \"state_province\"\n"
+        + "from \"foodmart\"\n"
+        + "order by 2, 1 desc offset 2 rows fetch next 3 rows only";
+    sql(sql)
+        .returnsOrdered("gender=M; state_province=OR",
+            "gender=F; state_province=OR",
+            "gender=M; state_province=WA")
+        .explainContains(explain);
+  }
+
+  @Test public void testOffsetLimit() {
+    // We do not yet push LIMIT into a Druid "select" query as a "threshold".
+    // It is not possible to push OFFSET into Druid "select" query.
+    final String sql = "select \"state_province\", \"product_name\"\n"
+        + "from \"foodmart\"\n"
+        + "offset 2 fetch next 3 rows only";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'columns':['state_province','product_name'],"
+        + "'resultFormat':'compactedList'}";
+    sql(sql)
+        .runs()
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testLimit() {
+    final String sql = "select \"gender\", \"state_province\"\n"
+        + "from \"foodmart\" fetch next 3 rows only";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'columns':['gender','state_province'],"
+        + "'resultFormat':'compactedList','limit':3";
+    sql(sql)
+        .runs()
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testDistinctLimit() {
+    final String sql = "select distinct \"gender\", \"state_province\"\n"
+        + "from \"foodmart\" fetch next 3 rows only";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default','dimension':'gender',"
+        + "'outputName':'gender','outputType':'STRING'},"
+        + "{'type':'default','dimension':'state_province','outputName':'state_province',"
+        + "'outputType':'STRING'}],'limitSpec':{'type':'default',"
+        + "'limit':3,'columns':[]},"
+        + "'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$39, $30]], "
+        + "groups=[{0, 1}], aggs=[[]], fetch=[3])";
+    sql(sql)
+        .runs()
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("gender=F; state_province=CA", "gender=F; state_province=OR",
+            "gender=F; state_province=WA");
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1578">[CALCITE-1578]
+   * Druid adapter: wrong semantics of topN query limit with granularity</a>. */
+  @Test public void testGroupBySortLimit() {
+    final String sql = "select \"brand_name\", \"gender\", sum(\"unit_sales\") as s\n"
+        + "from \"foodmart\"\n"
+        + "group by \"brand_name\", \"gender\"\n"
+        + "order by s desc limit 3";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'brand_name','outputName':'brand_name','outputType':'STRING'},"
+        + "{'type':'default','dimension':'gender','outputName':'gender','outputType':'STRING'}],"
+        + "'limitSpec':{'type':'default','limit':3,'columns':[{'dimension':'S',"
+        + "'direction':'descending','dimensionOrder':'numeric'}]},"
+        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$2, $39, $89]], groups=[{0, 1}], "
+        + "aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], fetch=[3])";
+    sql(sql)
+        .runs()
+        .returnsOrdered("brand_name=Hermanos; gender=M; S=4286",
+            "brand_name=Hermanos; gender=F; S=4183",
+            "brand_name=Tell Tale; gender=F; S=4033")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1587">[CALCITE-1587]
+   * Druid adapter: topN returns approximate results</a>. */
+  @Test public void testGroupBySingleSortLimit() {
+    checkGroupBySingleSortLimit(false);
+  }
+
+  /** As {@link #testGroupBySingleSortLimit}, but allowing approximate results
+   * due to {@link CalciteConnectionConfig#approximateDistinctCount()}.
+   * Therefore we send a "topN" query to Druid. */
+  @Test public void testGroupBySingleSortLimitApprox() {
+    checkGroupBySingleSortLimit(true);
+  }
+
+  private void checkGroupBySingleSortLimit(boolean approx) {
+    final String sql = "select \"brand_name\", sum(\"unit_sales\") as s\n"
+        + "from \"foodmart\"\n"
+        + "group by \"brand_name\"\n"
+        + "order by s desc limit 3";
+    final String approxDruid = "{'queryType':'topN','dataSource':'foodmart','granularity':'all',"
+        + "'dimension':{'type':'default','dimension':'brand_name','outputName':'brand_name','outputType':'STRING'},'metric':'S',"
+        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'threshold':3}";
+    final String exactDruid = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
+        + "'dimensions':[{'type':'default','dimension':'brand_name','outputName':'brand_name',"
+        + "'outputType':'STRING'}],'limitSpec':{'type':'default','limit':3,'columns':"
+        + "[{'dimension':'S','direction':'descending','dimensionOrder':'numeric'}]},'aggregations':"
+        + "[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    final String druidQuery = approx ? approxDruid : exactDruid;
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$2, $89]], groups=[{0}], "
+        + "aggs=[[SUM($1)]], sort0=[1], dir0=[DESC], fetch=[3])";
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", FOODMART.getPath()))
+        .with(CalciteConnectionProperty.APPROXIMATE_TOP_N.name(), approx)
+        .query(sql)
+        .runs()
+        .returnsOrdered("brand_name=Hermanos; S=8469",
+            "brand_name=Tell Tale; S=7877",
+            "brand_name=Ebony; S=7438")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1578">[CALCITE-1578]
+   * Druid adapter: wrong semantics of groupBy query limit with granularity</a>.
+   *
+   * <p>Before CALCITE-1578 was fixed, this would use a "topN" query but return
+   * the wrong results. */
+  @Test public void testGroupByDaySortDescLimit() {
+    final String sql = "select \"brand_name\","
+        + " floor(\"timestamp\" to DAY) as d,"
+        + " sum(\"unit_sales\") as s\n"
+        + "from \"foodmart\"\n"
+        + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
+        + "order by s desc limit 30";
+    final String explain =
+        "PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+            + "2992-01-10T00:00:00.000Z]], projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], "
+            + "groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], fetch=[30])";
+    sql(sql)
+        .runs()
+        .returnsStartingWith("brand_name=Ebony; D=1997-07-27 00:00:00; S=135",
+            "brand_name=Tri-State; D=1997-05-09 00:00:00; S=120",
+            "brand_name=Hermanos; D=1997-05-09 00:00:00; S=115")
+        .explainContains(explain)
+        .queryContains(
+            druidChecker("'queryType':'groupBy'", "'granularity':'all'", "'limitSpec"
+                + "':{'type':'default','limit':30,'columns':[{'dimension':'S',"
+                + "'direction':'descending','dimensionOrder':'numeric'}]}"));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1579">[CALCITE-1579]
+   * Druid adapter: wrong semantics of groupBy query limit with
+   * granularity</a>.
+   *
+   * <p>Before CALCITE-1579 was fixed, this would use a "groupBy" query but
+   * wrongly try to use a {@code limitSpec} to sort and filter. (A "topN" query
+   * was not possible because the sort was {@code ASC}.) */
+  @Test public void testGroupByDaySortLimit() {
+    final String sql = "select \"brand_name\","
+        + " floor(\"timestamp\" to DAY) as d,"
+        + " sum(\"unit_sales\") as s\n"
+        + "from \"foodmart\"\n"
+        + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
+        + "order by s desc limit 30";
+    final String druidQueryPart1 = "{'queryType':'groupBy','dataSource':'foodmart'";
+    final String druidQueryPart2 = "'limitSpec':{'type':'default','limit':30,"
+        + "'columns':[{'dimension':'S','direction':'descending',"
+        + "'dimensionOrder':'numeric'}]},'aggregations':[{'type':'longSum',"
+        + "'name':'S','fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}], "
+        + "aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], fetch=[30])";
+    sql(sql)
+        .runs()
+        .returnsStartingWith("brand_name=Ebony; D=1997-07-27 00:00:00; S=135",
+            "brand_name=Tri-State; D=1997-05-09 00:00:00; S=120",
+            "brand_name=Hermanos; D=1997-05-09 00:00:00; S=115")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQueryPart1, druidQueryPart2));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1580">[CALCITE-1580]
+   * Druid adapter: Wrong semantics for ordering within groupBy queries</a>. */
+  @Test public void testGroupByDaySortDimension() {
+    final String sql =
+        "select \"brand_name\", floor(\"timestamp\" to DAY) as d,"
+            + " sum(\"unit_sales\") as s\n"
+            + "from \"foodmart\"\n"
+            + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
+            + "order by \"brand_name\"";
+    final String subDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'brand_name','outputName':'brand_name','outputType':'STRING'},"
+        + "{'type':'extraction','dimension':'__time',"
+        + "'outputName':'floor_day','extractionFn':{'type':'timeFormat'";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$2, FLOOR($0, FLAG(DAY)), $89]], groups=[{0, 1}],"
+        + " aggs=[[SUM($2)]], sort0=[0], dir0=[ASC])";
+    sql(sql)
+        .runs()
+        .returnsStartingWith("brand_name=ADJ; D=1997-01-11 00:00:00; S=2",
+            "brand_name=ADJ; D=1997-01-12 00:00:00; S=3",
+            "brand_name=ADJ; D=1997-01-17 00:00:00; S=3")
+        .explainContains(explain)
+        .queryContains(druidChecker(subDruidQuery));
+  }
+
+  /** Tests a query that contains no GROUP BY and is therefore executed as a
+   * Druid "select" query. */
+  @Test public void testFilterSortDesc() {
+    final String sql = "select \"product_name\" from \"foodmart\"\n"
+        + "where \"product_id\" BETWEEN '1500' AND '1502'\n"
+        + "order by \"state_province\" desc, \"product_id\"";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'filter':{'type':'and','fields':["
+        + "{'type':'bound','dimension':'product_id','lower':'1500','lowerStrict':false,'ordering':'lexicographic'},"
+        + "{'type':'bound','dimension':'product_id','upper':'1502','upperStrict':false,'ordering':'lexicographic'}]},"
+        + "'columns':['product_name','state_province','product_id'],"
+        + "'resultFormat':'compactedList'";
+    sql(sql)
+        .limit(4)
+        .returns(
+            new Function<ResultSet, Void>() {
+              public Void apply(ResultSet resultSet) {
+                try {
+                  for (int i = 0; i < 4; i++) {
+                    assertTrue(resultSet.next());
+                    assertThat(resultSet.getString("product_name"),
+                        is("Fort West Dried Apricots"));
+                  }
+                  assertFalse(resultSet.next());
+                  return null;
+                } catch (SQLException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            })
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** As {@link #testFilterSortDesc()} but the bounds are numeric. */
+  @Test public void testFilterSortDescNumeric() {
+    final String sql = "select \"product_name\" from \"foodmart\"\n"
+        + "where \"product_id\" BETWEEN 1500 AND 1502\n"
+        + "order by \"state_province\" desc, \"product_id\"";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'filter':{'type':'and','fields':["
+        + "{'type':'bound','dimension':'product_id','lower':'1500','lowerStrict':false,'ordering':'numeric'},"
+        + "{'type':'bound','dimension':'product_id','upper':'1502','upperStrict':false,'ordering':'numeric'}]},"
+        + "'columns':['product_name','state_province','product_id'],"
+        + "'resultFormat':'compactedList'";
+    sql(sql)
+        .limit(4)
+        .returns(
+            new Function<ResultSet, Void>() {
+              public Void apply(ResultSet resultSet) {
+                try {
+                  for (int i = 0; i < 4; i++) {
+                    assertTrue(resultSet.next());
+                    assertThat(resultSet.getString("product_name"),
+                        is("Fort West Dried Apricots"));
+                  }
+                  assertFalse(resultSet.next());
+                  return null;
+                } catch (SQLException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            })
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** Tests a query whose filter removes all rows. */
+  @Test public void testFilterOutEverything() {
+    final String sql = "select \"product_name\" from \"foodmart\"\n"
+        + "where \"product_id\" = -1";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'filter':{'type':'bound','dimension':'product_id','lower':'-1','lowerStrict':false,"
+        + "'upper':'-1','upperStrict':false,'ordering':'numeric'},"
+        + "'columns':['product_name'],"
+        + "'resultFormat':'compactedList'}";
+    sql(sql)
+        .limit(4)
+        .returnsUnordered()
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** As {@link #testFilterSortDescNumeric()} but with a filter that cannot
+   * be pushed down to Druid. */
+  @Test public void testNonPushableFilterSortDesc() {
+    final String sql = "select \"product_name\" from \"foodmart\"\n"
+        + "where cast(\"product_id\" as integer) - 1500 BETWEEN 0 AND 2\n"
+        + "order by \"state_province\" desc, \"product_id\"";
+    final String druidQuery = "{'queryType':'scan','dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],";
+    final String druidFilter = "\"filter\":{\"type\":\"and\","
+        + "\"fields\":[{\"type\":\"expression\",\"expression\":\"((CAST(\\\"product_id\\\"";
+    final String druidQuery2 = "'columns':['product_name','state_province','product_id'],"
+        + "'resultFormat':'compactedList'}";
+
+    sql(sql)
+        .limit(4)
+        .returns(
+            new Function<ResultSet, Void>() {
+              public Void apply(ResultSet resultSet) {
+                try {
+                  for (int i = 0; i < 4; i++) {
+                    assertTrue(resultSet.next());
+                    assertThat(resultSet.getString("product_name"),
+                        is("Fort West Dried Apricots"));
+                  }
+                  assertFalse(resultSet.next());
+                  return null;
+                } catch (SQLException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            })
+        .queryContains(druidChecker(druidQuery, druidFilter, druidQuery2));
+  }
+
+  @Test public void testUnionPlan() {
+    final String sql = "select distinct \"gender\" from \"foodmart\"\n"
+        + "union all\n"
+        + "select distinct \"marital_status\" from \"foodmart\"";
+    final String explain = "PLAN="
+        + "EnumerableInterpreter\n"
+        + "  BindableUnion(all=[true])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{39}], aggs=[[]])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{37}], aggs=[[]])";
+    sql(sql)
+        .explainContains(explain)
+        .returnsUnordered(
+            "gender=F",
+            "gender=M",
+            "gender=M",
+            "gender=S");
+  }
+
+  @Test public void testFilterUnionPlan() {
+    final String sql = "select * from (\n"
+        + "  select distinct \"gender\" from \"foodmart\"\n"
+        + "  union all\n"
+        + "  select distinct \"marital_status\" from \"foodmart\")\n"
+        + "where \"gender\" = 'M'";
+    final String explain = "PLAN="
+        + "EnumerableInterpreter\n"
+        + "  BindableFilter(condition=[=($0, 'M')])\n"
+        + "    BindableUnion(all=[true])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{39}], aggs=[[]])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{37}], aggs=[[]])";
+    sql(sql)
+        .explainContains(explain)
+        .returnsUnordered("gender=M",
+            "gender=M");
+  }
+
+  @Test public void testCountGroupByEmpty() {
+    final String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart',"
+        + "'descending':false,'granularity':'all',"
+        + "'aggregations':[{'type':'count','name':'EXPR$0'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'context':{'skipEmptyBuckets':false}}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[0]], groups=[{}], aggs=[[COUNT()]])";
+    final String sql = "select count(*) from \"foodmart\"";
+    sql(sql)
+        .returns("EXPR$0=86829\n")
+        .queryContains(druidChecker(druidQuery))
+        .explainContains(explain);
+  }
+
+  @Test public void testGroupByOneColumnNotProjected() {
+    final String sql = "select count(*) as c from \"foodmart\"\n"
+        + "group by \"state_province\" order by 1";
+    sql(sql)
+        .returnsOrdered("C=21610",
+            "C=24441",
+            "C=40778");
+  }
+
+  /** Unlike {@link #testGroupByTimeAndOneColumnNotProjected()}, we cannot use
+   * "topN" because we have a global limit, and that requires
+   * {@code granularity: all}. */
+  @Test public void testGroupByTimeAndOneColumnNotProjectedWithLimit() {
+    final String sql = "select count(*) as \"c\","
+        + " floor(\"timestamp\" to MONTH) as \"month\"\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH), \"state_province\"\n"
+        + "order by \"c\" desc limit 3";
+    sql(sql)
+        .returnsOrdered("c=4070; month=1997-12-01 00:00:00",
+            "c=4033; month=1997-11-01 00:00:00",
+            "c=3511; month=1997-07-01 00:00:00")
+        .queryContains(druidChecker("'queryType':'groupBy'"));
+  }
+
+  @Test public void testGroupByTimeAndOneMetricNotProjected() {
+    final String sql =
+        "select count(*) as \"c\", floor(\"timestamp\" to MONTH) as \"month\", floor"
+            + "(\"store_sales\") as sales\n"
+            + "from \"foodmart\"\n"
+            + "group by floor(\"timestamp\" to MONTH), \"state_province\", floor"
+            + "(\"store_sales\")\n"
+            + "order by \"c\" desc limit 3";
+    sql(sql).returnsOrdered("c=494; month=1997-11-01 00:00:00; SALES=5.0",
+        "c=475; month=1997-12-01 00:00:00; SALES=5.0",
+        "c=468; month=1997-03-01 00:00:00; SALES=5.0").queryContains(druidChecker("'queryType':'groupBy'"));
+  }
+
+  @Test public void testGroupByTimeAndOneColumnNotProjected() {
+    final String sql = "select count(*) as \"c\",\n"
+        + "  floor(\"timestamp\" to MONTH) as \"month\"\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH), \"state_province\"\n"
+        + "having count(*) > 3500";
+    sql(sql)
+        .returnsUnordered("c=3511; month=1997-07-01 00:00:00",
+            "c=4033; month=1997-11-01 00:00:00",
+            "c=4070; month=1997-12-01 00:00:00")
+        .queryContains(druidChecker("'queryType':'groupBy'"));
+  }
+
+  @Test public void testOrderByOneColumnNotProjected() {
+    // Result including state: CA=24441, OR=21610, WA=40778
+    final String sql = "select count(*) as c from \"foodmart\"\n"
+        + "group by \"state_province\" order by \"state_province\"";
+    sql(sql)
+        .returnsOrdered("C=24441",
+            "C=21610",
+            "C=40778");
+  }
+
+  @Test public void testGroupByOneColumn() {
+    final String sql = "select \"state_province\", count(*) as c\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\"\n"
+        + "order by \"state_province\"";
+    String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$30]], groups=[{0}], "
+        + "aggs=[[COUNT()]], sort0=[0], dir0=[ASC])";
+    sql(sql)
+        .limit(2)
+        .returnsOrdered("state_province=CA; C=24441",
+            "state_province=OR; C=21610")
+        .explainContains(explain);
+  }
+
+  @Test public void testGroupByOneColumnReversed() {
+    final String sql = "select count(*) as c, \"state_province\"\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\"\n"
+        + "order by \"state_province\"";
+    sql(sql)
+        .limit(2)
+        .returnsOrdered("C=24441; state_province=CA",
+            "C=21610; state_province=OR");
+  }
+
+  @Test public void testGroupByAvgSumCount() {
+    final String sql = "select \"state_province\",\n"
+        + " avg(\"unit_sales\") as a,\n"
+        + " sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c,\n"
+        + " count(*) as c0\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\"\n"
+        + "order by 1";
+    sql(sql)
+        .limit(2)
+        .returnsUnordered("state_province=CA; A=3; S=74748; C=16347; C0=24441",
+            "state_province=OR; A=3; S=67659; C=21610; C0=21610")
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  BindableProject(state_province=[$0], A=[CAST(/(CASE(=($2, 0), null, $1), $2)):BIGINT],"
+            + " S=[CASE(=($2, 0), null, $1)], C=[$3], C0=[$4])\n"
+            + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+            + "2992-01-10T00:00:00.000Z]], projects=[[$30, $89, $71]], groups=[{0}], "
+            + "aggs=[[$SUM0($1), COUNT($1), COUNT($2), COUNT()]], sort0=[0], dir0=[ASC])")
+        .queryContains(
+            druidChecker("{'queryType':'groupBy','dataSource':'foodmart','granularity':'all'"
+                + ",'dimensions':[{'type':'default','dimension':'state_province','outputName':'state_province'"
+                + ",'outputType':'STRING'}],'limitSpec':"
+                + "{'type':'default','columns':[{'dimension':'state_province',"
+                + "'direction':'ascending','dimensionOrder':'lexicographic'}]},'aggregations':"
+                + "[{'type':'longSum','name':'$f1','fieldName':'unit_sales'},{'type':'filtered',"
+                + "'filter':{'type':'not','field':{'type':'selector','dimension':'unit_sales',"
+                + "'value':null}},'aggregator':{'type':'count','name':'$f2','fieldName':'unit_sales'}}"
+                + ",{'type':'filtered','filter':{'type':'not','field':{'type':'selector',"
+                + "'dimension':'store_sqft','value':null}},'aggregator':{'type':'count','name':'C',"
+                + "'fieldName':'store_sqft'}},{'type':'count','name':'C0'}],"
+                + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"));
+  }
+
+  @Test public void testGroupByMonthGranularity() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH) order by s";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart'";
+    sql(sql)
+        .limit(3)
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  BindableProject(S=[$1], C=[$2])\n"
+            + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+            + "2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, FLAG(MONTH)), $89, $71]], "
+            + "groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[1], dir0=[ASC])")
+        .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; C=5591")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1577">[CALCITE-1577]
+   * Druid adapter: Incorrect result - limit on timestamp disappears</a>. */
+  @Test public void testGroupByMonthGranularitySort() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH)\n"
+        + "order by floor(\"timestamp\" to MONTH) ASC";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[0], "
+        + "dir0=[ASC])";
+    sql(sql)
+        .explainContains(explain)
+        .returnsOrdered("S=21628; C=5957",
+            "S=20957; C=5842",
+            "S=23706; C=6528",
+            "S=20179; C=5523",
+            "S=21081; C=5793",
+            "S=21350; C=5863",
+            "S=23763; C=6762",
+            "S=21697; C=5915",
+            "S=20388; C=5591",
+            "S=19958; C=5606",
+            "S=25270; C=7026",
+            "S=26796; C=7338");
+  }
+
+  @Test public void testGroupByMonthGranularitySortLimit() {
+    final String sql = "select floor(\"timestamp\" to MONTH) as m,\n"
+        + " sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH)\n"
+        + "order by floor(\"timestamp\" to MONTH) limit 3";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, FLAG(MONTH)), $89, $71]], groups=[{0}], "
+        + "aggs=[[SUM($1), COUNT($2)]], sort0=[0], dir0=[ASC], fetch=[3])";
+    sql(sql)
+        .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=5957",
+            "M=1997-02-01 00:00:00; S=20957; C=5842",
+            "M=1997-03-01 00:00:00; S=23706; C=6528")
+        .explainContains(explain);
+  }
+
+  @Test public void testGroupByDayGranularity() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to DAY) order by c desc";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart'";
+    sql(sql)
+        .limit(3)
+        .queryContains(druidChecker(druidQuery))
+        .returnsOrdered("S=3850; C=1230", "S=3342; C=1071", "S=3219; C=1024");
+  }
+
+  @Test public void testGroupByMonthGranularityFiltered() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + " count(\"store_sqft\") as c\n"
+        + "from \"foodmart\"\n"
+        + "where \"timestamp\" >= '1996-01-01 00:00:00' and "
+        + " \"timestamp\" < '1998-01-01 00:00:00'\n"
+        + "group by floor(\"timestamp\" to MONTH) order by s asc";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart'";
+
+    sql(sql)
+        .limit(3)
+        .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; C=5591")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testTopNMonthGranularity() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + "max(\"unit_sales\") as m,\n"
+        + "\"state_province\" as p\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\", floor(\"timestamp\" to MONTH)\n"
+        + "order by s desc limit 3";
+    // Cannot use a Druid "topN" query, granularity != "all";
+    // have to use "groupBy" query followed by external Sort and fetch.
+    final String explain = "PLAN="
+        + "EnumerableCalc(expr#0..3=[{inputs}], S=[$t2], M=[$t3], P=[$t0])\n"
+        + "  EnumerableInterpreter\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$30, FLOOR"
+        + "($0, FLAG(MONTH)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]], sort0=[2], "
+        + "dir0=[DESC], fetch=[3])";
+    final String druidQueryPart1 = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default',"
+        + "'dimension':'state_province',\"outputName\":\"state_province\",\"outputType\":\"STRING\"},"
+        + "{'type':'extraction','dimension':'__time',"
+        + "'outputName':'floor_month','extractionFn':{'type':'timeFormat','format'";
+    final String druidQueryPart2 = "'limitSpec':{'type':'default','limit':3,"
+        + "'columns':[{'dimension':'S','direction':'descending',"
+        + "'dimensionOrder':'numeric'}]},'aggregations':[{'type':'longSum',"
+        + "'name':'S','fieldName':'unit_sales'},{'type':'longMax','name':'M',"
+        + "'fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql)
+        .returnsUnordered("S=12399; M=6; P=WA",
+            "S=12297; M=7; P=WA",
+            "S=10640; M=6; P=WA")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQueryPart1, druidQueryPart2));
+  }
+
+  @Test public void testTopNDayGranularityFiltered() {
+    final String sql = "select sum(\"unit_sales\") as s,\n"
+        + "max(\"unit_sales\") as m,\n"
+        + "\"state_province\" as p\n"
+        + "from \"foodmart\"\n"
+        + "where \"timestamp\" >= '1997-01-01 00:00:00' and "
+        + " \"timestamp\" < '1997-09-01 00:00:00'\n"
+        + "group by \"state_province\", floor(\"timestamp\" to DAY)\n"
+        + "order by s desc limit 6";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(S=[$2], M=[$3], P=[$0])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1997-01-01T00:00:00.000Z/1997-09-01T00:00:00.000Z]], projects=[[$30, FLOOR"
+        + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2), MAX($2)]], sort0=[2], "
+        + "dir0=[DESC], fetch=[6])";
+    final String druidQueryType = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions'";
+    final String limitSpec = "'limitSpec':{'type':'default','limit':6,"
+        + "'columns':[{'dimension':'S','direction':'descending','dimensionOrder':'numeric'}]}";
+    sql(sql)
+        .returnsOrdered("S=2527; M=5; P=OR",
+            "S=2525; M=6; P=OR",
+            "S=2238; M=6; P=OR",
+            "S=1715; M=5; P=OR",
+            "S=1691; M=5; P=OR",
+            "S=1629; M=5; P=WA")
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQueryType, limitSpec));
+  }
+
+  @Test public void testGroupByHaving() {
+    final String sql = "select \"state_province\" as s, count(*) as c\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\" having count(*) > 23000 order by 1";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]], projects=[[$30]], groups=[{0}], aggs=[[COUNT()]], "
+        + "filter=[>($1, 23000)], sort0=[0], dir0=[ASC])";
+    sql(sql)
+        .returnsOrdered("S=CA; C=24441",
+            "S=WA; C=40778")
+        .explainContains(explain);
+  }
+
+  @Test public void testGroupComposite() {
+    // Note: We don't push down SORT-LIMIT yet
+    final String sql = "select count(*) as c, \"state_province\", \"city\"\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\", \"city\"\n"
+        + "order by c desc limit 2";
+    final String explain = "BindableProject(C=[$2], state_province=[$0], city=[$1])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$30, $29]], groups=[{0, 1}], aggs=[[COUNT()]], sort0=[2], dir0=[DESC], fetch=[2])";
+    sql(sql)
+        .returnsOrdered("C=7394; state_province=WA; city=Spokane",
+            "C=3958; state_province=WA; city=Olympia")
+        .explainContains(explain);
+  }
+
+  /** Tests that distinct-count is pushed down to Druid and evaluated using
+   * "cardinality". The result is approximate, but gives the correct result in
+   * this example when rounded down using FLOOR. */
+  @Test public void testDistinctCount() {
+    final String sql = "select \"state_province\",\n"
+        + " floor(count(distinct \"city\")) as cdc\n"
+        + "from \"foodmart\"\n"
+        + "group by \"state_province\"\n"
+        + "order by 2 desc limit 2";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableSort(sort0=[$1], dir0=[DESC], fetch=[2])\n"
+        + "    BindableProject(state_province=[$0], CDC=[FLOOR($1)])\n"
+        + "      BindableAggregate(group=[{1}], agg#0=[COUNT($0)])\n"
+        + "        DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{29, 30}], "
+        + "aggs=[[]])";
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'default','dimension':'city','outputName':'city'"
+        + ",'outputType':'STRING'},"
+        + "{'type':'default','dimension':'state_province','outputName':'state_province','outputType':'STRING'}],"
+        + "'limitSpec':{'type':'default'},'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql)
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("state_province=CA; CDC=45",
+            "state_province=WA; CDC=22");
+  }
+
+  /** Tests that projections of columns are pushed into the DruidQuery, and
+   * projections of expressions that Druid cannot handle (in this case, a
+   * literal 0) stay up. */
+  @Test public void testProject() {
+    final String sql = "select \"product_name\", 0 as zero\n"
+        + "from \"foodmart\"\n"
+        + "order by \"product_name\"";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$3, 0]])";
+    sql(sql)
+        .limit(2)
+        .returnsUnordered("product_name=ADJ Rosy Sunglasses; ZERO=0",
+            "product_name=ADJ Rosy Sunglasses; ZERO=0")
+        .explainContains(explain);
+  }
+
+  @Test public void testFilterDistinct() {
+    final String sql = "select distinct \"state_province\", \"city\",\n"
+        + "  \"product_name\"\n"
+        + "from \"foodmart\"\n"
+        + "where \"product_name\" = 'High Top Dried Mushrooms'\n"
+        + "and \"quarter\" in ('Q2', 'Q3')\n"
+        + "and \"state_province\" = 'WA'";
+    final String druidQuery1 = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all'";
+    final String druidQuery2 = "'filter':{'type':'and','fields':[{'type':'selector','dimension':"
+        + "'product_name','value':'High Top Dried Mushrooms'},{'type':'or','fields':[{'type':'selector',"
+        + "'dimension':'quarter','value':'Q2'},{'type':'selector','dimension':'quarter',"
+        + "'value':'Q3'}]},{'type':'selector','dimension':'state_province','value':'WA'}]},"
+        + "'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+        + "2992-01-10T00:00:00.000Z]],"
+        + " filter=[AND(=($3, 'High Top Dried Mushrooms'),"
+        + " OR(=($87, 'Q2'),"
+        + " =($87, 'Q3')),"
+        + " =($30, 'WA'))],"
+        + " projects=[[$30, $29, $3]], groups=[{0, 1, 2}], aggs=[[]])\n";
+    sql(sql)
+        .queryContains(druidChecker(druidQuery1, druidQuery2))
+        .explainContains(explain)
+        .returnsUnordered(
+            "state_province=WA; city=Bremerton; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Everett; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Kirkland; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Lynnwood; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Olympia; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Port Orchard; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Puyallup; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Spokane; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Tacoma; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Yakima; product_name=High Top Dried Mushrooms");
+  }
+
+  @Test public void testFilter() {
+    final String sql = "select \"state_province\", \"city\",\n"
+        + "  \"product_name\"\n"
+        + "from \"foodmart\"\n"
+        + "where \"product_name\" = 'High Top Dried Mushrooms'\n"
+        + "and \"quarter\" in ('Q2', 'Q3')\n"
+        + "and \"state_province\" = 'WA'";
+    final String druidQuery = "{'queryType':'scan',"
+        + "'dataSource':'foodmart',"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+        + "'filter':{'type':'and','fields':["
+        + "{'type':'selector','dimension':'product_name','value':'High Top Dried Mushrooms'},"
+        + "{'type':'or','fields':["
+        + "{'type':'selector','dimension':'quarter','value':'Q2'},"
+        + "{'type':'selector','dimension':'quarter','value':'Q3'}]},"
+        + "{'type':'selector','dimension':'state_province','value':'WA'}]},"
+        + "'columns':['state_province','city','product_name'],"
+        + "'resultFormat':'compactedList'}";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+        + "filter=[AND(=($3, 'High Top Dried Mushrooms'), "
+        + "OR(=($87, 'Q2'), =($87, 'Q3')), =($30, 'WA'))], "
+        + "projects=[[$30, $29, $3]])\n";
+    sql(sql)
+        .queryContains(druidChecker(druidQuery))
+        .explainContains(explain)
+        .returnsUnordered(
+            "state_province=WA; city=Bremerton; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Everett; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Kirkland; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Lynnwood; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Olympia; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Port Orchard; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Puyallup; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Puyallup; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Spokane; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Spokane; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Spokane; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Tacoma; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Yakima; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Yakima; product_name=High Top Dried Mushrooms",
+            "state_province=WA; city=Yakima; product_name=High Top Dried Mushrooms");
+  }
+
+  /** Tests that conditions applied to time units extracted via the EXTRACT
+   * function become ranges on the timestamp column
+   *
+   * <p>Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1334">[CALCITE-1334]
+   * Convert predicates on EXTRACT function calls into date ranges</a>. */
+  @Test public void testFilterTimestamp() {
+    String sql = "select count(*) as c\n"
+        + "from \"foodmart\"\n"
+        + "where extract(year from \"timestamp\") = 1997\n"
+        + "and extract(month from \"timestamp\") in (4, 6)\n";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1997-04-01T00:00:00.000Z/"
+        + "1997-05-01T00:00:00.000Z, 1997-06-01T00:00:00.000Z/1997-07-01T00:00:00.000Z]],"
+        + " projects=[[0]], groups=[{}], aggs=[[COUNT()]])";
+    sql(sql)
+        .returnsUnordered("C=13500")
+        .explainContains(explain);
+  }
+
+  @Test public void testFilterSwapped() {
+    String sql = "select \"state_province\"\n"
+        + "from \"foodmart\"\n"
+        + "where 'High Top Dried Mushrooms' = \"product_name\"";
+    final String explain = "EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=('High Top Dried Mushrooms', $3)], projects=[[$30]])";
+    final String druidQuery = "'filter':{'type':'selector','dimension':'product_name',"
+        + "'value':'High Top Dried Mushrooms'}";
+    sql(sql)
+        .explainContains(explain)
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testGroupByMetricAndExtractTime() {
+    final String sql =
+        "SELECT count(*), floor(\"timestamp\" to DAY), \"store_sales\" "
+            + "FROM \"foodmart\"\n"
+            + "GROUP BY \"store_sales\", floor(\"timestamp\" to DAY)\n ORDER BY \"store_sales\" DESC\n"
+            + "LIMIT 10\n";
+    sql(sql).queryContains(druidChecker("{\"queryType\":\"groupBy\""));
+  }
+
+  @Test public void testFilterOnDouble() {
+    String sql = "select \"product_id\" from \"foodmart\"\n"
+        + "where cast(\"product_id\" as double) < 0.41024 and \"product_id\" < 12223";
+    sql(sql).queryContains(
+        druidChecker("'type':'bound','dimension':'product_id','upper':'0.41024'",
+            "'upper':'12223'"));
+  }
+
+  @Test public void testPushAggregateOnTime() {
+    String sql = "select \"product_id\", \"timestamp\" as \"time\" "
+        + "from \"foodmart\" "
+        + "where \"product_id\" = 1016 "
+        + "and \"timestamp\" < '1997-01-03 00:00:00' "
+        + "and \"timestamp\" > '1990-01-01 00:00:00' "
+        + "group by \"timestamp\", \"product_id\" ";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract',"
+        + "'extractionFn':{'type':'timeFormat','format':'yyyy-MM-dd";
+    sql(sql)
+        .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractYear() {
+    String sql = "select EXTRACT( year from \"timestamp\") as \"year\",\"product_id\" from "
+        + "\"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1999-01-02' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_year',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .returnsUnordered("year=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractMonth() {
+    String sql = "select EXTRACT( month from \"timestamp\") as \"month\",\"product_id\" from "
+        + "\"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( month from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_month',"
+                    + "'extractionFn':{'type':'timeFormat','format':'M',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .returnsUnordered("month=1; product_id=1016", "month=2; product_id=1016",
+            "month=3; product_id=1016", "month=4; product_id=1016", "month=5; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractDay() {
+    String sql = "select EXTRACT( day from \"timestamp\") as \"day\","
+        + "\"product_id\" from \"foodmart\""
+        + " where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)" + " group by "
+        + " EXTRACT( day from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_day',"
+                    + "'extractionFn':{'type':'timeFormat','format':'d',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .returnsUnordered("day=2; product_id=1016", "day=10; product_id=1016",
+            "day=13; product_id=1016", "day=16; product_id=1016");
+  }
+
+  @Test
+  public void testPushAggregateOnTimeWithExtractHourOfDay() {
+    String sql =
+        "select EXTRACT( hour from \"timestamp\") as \"hourOfDay\",\"product_id\"  from "
+            + "\"foodmart\" where \"product_id\" = 1016 and "
+            + "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
+            + "('1997-01-01' as timestamp)" + " group by "
+            + " EXTRACT( hour from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(druidChecker("'queryType':'groupBy'"))
+        .returnsUnordered("hourOfDay=0; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractYearMonthDay() {
+    String sql = "select EXTRACT( day from \"timestamp\") as \"day\", EXTRACT( month from "
+        + "\"timestamp\") as \"month\",  EXTRACT( year from \"timestamp\") as \"year\",\""
+        + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'",
+                "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_day',"
+                    + "'extractionFn':{'type':'timeFormat','format':'d',"
+                    + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_month',"
+                    + "'extractionFn':{'type':'timeFormat','format':'M',"
+                    + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_year',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001Z/1997-01-20T00:00:00.000Z]], "
+            + "filter=[=($1, 1016)], projects=[[EXTRACT(FLAG(DAY), $0), EXTRACT(FLAG(MONTH), $0), "
+            + "EXTRACT(FLAG(YEAR), $0), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+        .returnsUnordered("day=2; month=1; year=1997; product_id=1016",
+            "day=10; month=1; year=1997; product_id=1016",
+            "day=13; month=1; year=1997; product_id=1016",
+            "day=16; month=1; year=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractYearMonthDayWithOutRenaming() {
+    String sql = "select EXTRACT( day from \"timestamp\"), EXTRACT( month from "
+        + "\"timestamp\"), EXTRACT( year from \"timestamp\"),\""
+        + "product_id\"  from \"foodmart\" where \"product_id\" = 1016 and "
+        + "\"timestamp\" < cast('1997-01-20' as timestamp) and \"timestamp\" > cast"
+        + "('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( month from \"timestamp\"),"
+        + " EXTRACT( year from \"timestamp\"), \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_day',"
+                    + "'extractionFn':{'type':'timeFormat','format':'d',"
+                    + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_month',"
+                    + "'extractionFn':{'type':'timeFormat','format':'M',"
+                    + "'timeZone':'UTC','locale':'en-US'}}", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_year',"
+                    + "'extractionFn':{'type':'timeFormat','format':'yyyy',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001Z/1997-01-20T00:00:00.000Z]], "
+            + "filter=[=($1, 1016)], projects=[[EXTRACT(FLAG(DAY), $0), EXTRACT(FLAG(MONTH), $0), "
+            + "EXTRACT(FLAG(YEAR), $0), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+        .returnsUnordered("EXPR$0=2; EXPR$1=1; EXPR$2=1997; product_id=1016",
+            "EXPR$0=10; EXPR$1=1; EXPR$2=1997; product_id=1016",
+            "EXPR$0=13; EXPR$1=1; EXPR$2=1997; product_id=1016",
+            "EXPR$0=16; EXPR$1=1; EXPR$2=1997; product_id=1016");
+  }
+
+  @Test public void testPushAggregateOnTimeWithExtractWithOutRenaming() {
+    String sql = "select EXTRACT( day from \"timestamp\"), "
+        + "\"product_id\" as \"dayOfMonth\" from \"foodmart\" "
+        + "where \"product_id\" = 1016 and \"timestamp\" < cast('1997-01-20' as timestamp) "
+        + "and \"timestamp\" > cast('1997-01-01' as timestamp)"
+        + " group by "
+        + " EXTRACT( day from \"timestamp\"), EXTRACT( day from \"timestamp\"),"
+        + " \"product_id\" ";
+    sql(sql)
+        .queryContains(
+            druidChecker(
+                ",'granularity':'all'", "{'type':'extraction',"
+                    + "'dimension':'__time','outputName':'extract_day',"
+                    + "'extractionFn':{'type':'timeFormat','format':'d',"
+                    + "'timeZone':'UTC','locale':'en-US'}}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.001Z/1997-01-20T00:00:00.000Z]], "
+            + "filter=[=($1, 1016)], projects=[[EXTRACT(FLAG(DAY), $0), $1]], "
+            + "groups=[{0, 1}], aggs=[[]])\n")
+        .returnsUnordered("EXPR$0=2; dayOfMonth=1016", "EXPR$0=10; dayOfMonth=1016",
+            "EXPR$0=13; dayOfMonth=1016", "EXPR$0=16; dayOfMonth=1016");
+  }
+
+  @Test public void testPushComplexFilter() {
+    String sql = "select sum(\"store_sales\") from \"foodmart\" "
+        + "where EXTRACT( year from \"timestamp\") = 1997 and "
+        + "\"cases_per_pallet\" >= 8 and \"cases_per_pallet\" <= 10 and "
+        + "\"units_per_case\" < 15 ";
+    String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+        + "'granularity':'all','filter':{'type':'and','fields':[{'type':'bound','dimension':"
+        + "'cases_per_pallet','lower':'8','lowerStrict':false,'ordering':'numeric'},"
+        + "{'type':'bound','dimension':'cases_per_pallet','upper':'10','upperStrict':false,"
+        + "'ordering':'numeric'},{'type':'bound','dimension':'units_per_case','upper':'15',"
+        + "'upperStrict':true,'ordering':'numeric'}]},'aggregations':[{'type':'doubleSum',"
+        + "'name':'EXPR$0','fieldName':'store_sales'}],'intervals':['1997-01-01T00:00:00.000Z/"
+        + "1998-01-01T00:00:00.000Z'],'context':{'skipEmptyBuckets':true}}";
+    sql(sql)
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1997-01-01T00:00:00.000Z/1998-01-01T00:00:00.000Z]], "
+            + "filter=[AND(>=(CAST($11):BIGINT, 8), <=(CAST($11):BIGINT, 10), "
+            + "<(CAST($10):BIGINT, 15))], groups=[{}], "
+            + "aggs=[[SUM($90)]])\n")
+        .returnsUnordered("EXPR$0=75364.1")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testPushOfFilterExtractionOnDayAndMonth() {
+    String sql = "SELECT \"product_id\" , EXTRACT(day from \"timestamp\"), EXTRACT(month from "
+        + "\"timestamp\") from \"foodmart\" WHERE  EXTRACT(day from \"timestamp\") >= 30 AND "
+        + "EXTRACT(month from \"timestamp\") = 11 "
+        + "AND  \"product_id\" >= 1549 group by \"product_id\", EXTRACT(day from "
+        + "\"timestamp\"), EXTRACT(month from \"timestamp\")";
+    sql(sql)
+        .returnsUnordered("product_id=1549; EXPR$1=30; EXPR$2=11",
+            "product_id=1553; EXPR$1=30; EXPR$2=11");
+  }
+
+  @Test public void testPushOfFilterExtractionOnDayAndMonthAndYear() {
+    String sql = "SELECT \"product_id\" , EXTRACT(day from \"timestamp\"), EXTRACT(month from "
+        + "\"timestamp\") , EXTRACT(year from \"timestamp\") from \"foodmart\" "
+        + "WHERE  EXTRACT(day from \"timestamp\") >= 30 AND EXTRACT(month from \"timestamp\") = 11 "
+        + "AND  \"product_id\" >= 1549 AND EXTRACT(year from \"timestamp\") = 1997"
+        + "group by \"product_id\", EXTRACT(day from \"timestamp\"), "
+        + "EXTRACT(month from \"timestamp\"), EXTRACT(year from \"timestamp\")";
+    sql(sql)
+        .returnsUnordered("product_id=1549; EXPR$1=30; EXPR$2=11; EXPR$3=1997",
+            "product_id=1553; EXPR$1=30; EXPR$2=11; EXPR$3=1997")
+        .queryContains(
+            druidChecker("{'queryType':'groupBy','dataSource':'foodmart','granularity':'all'"));
+  }
+
+  @Test public void testFilterExtractionOnMonthWithBetween() {
+    String sqlQuery = "SELECT \"product_id\", EXTRACT(month from \"timestamp\") FROM \"foodmart\""
+        + " WHERE EXTRACT(month from \"timestamp\") BETWEEN 10 AND 11 AND  \"product_id\" >= 1558"
+        + " GROUP BY \"product_id\", EXTRACT(month from \"timestamp\")";
+    String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart'";
+    sql(sqlQuery)
+        .returnsUnordered("product_id=1558; EXPR$1=10", "product_id=1558; EXPR$1=11",
+            "product_id=1559; EXPR$1=11")
+        .queryContains(druidChecker(druidQuery));
+  }
+
+  @Test public void testFilterExtractionOnMonthWithIn() {
+    String sqlQuery = "SELECT \"product_id\", EXTRACT(month from \"timestamp\") FROM \"foodmart\""
+        + " WHERE EXTRACT(month from \"timestamp\") IN (10, 11) AND  \"product_id\" >= 1558"
+        + " GROUP BY \"product_id\", EXTRACT(month from \"timestamp\")";
+    sql(sqlQuery)
+        .returnsUnordered("product_id=1558; EXPR$1=10", "product_id=1558; EXPR$1=11",
+            "product_id=1559; EXPR$1=11")
+        .queryContains(
+            druidChecker("{'queryType':'groupBy',"
+                + "'dataSource':'foodmart','granularity':'all',"
+                + "'dimensions':[{'type':'default','dimension':'product_id','outputName':'product_id','outputType':'STRING'},"
+                + "{'type':'extraction','dimension':'__time','outputName':'extract_month',"
+                + "'extractionFn':{'type':'timeFormat','format':'M','timeZone':'UTC',"
+                + "'locale':'en-US'}}],'limitSpec':{'type':'default'},"
+                + "'filter':{'type':'and','fields':[{'type':'bound',"
+                + "'dimension':'product_id','lower':'1558','lowerStrict':false,"
+                + "'ordering':'numeric'},{'type':'or','fields':[{'type':'bound','dimension':'__time'"
+                + ",'lower':'10','lowerStrict':false,'upper':'10','upperStrict':false,"
+                + "'ordering':'numeric','extractionFn':{'type':'timeFormat',"
+                + "'format':'M','timeZone':'UTC','locale':'en-US'}},{'type':'bound',"
+                + "'dimension':'__time','lower':'11','lowerStrict':false,'upper':'11',"
+                + "'upperStrict':false,'ordering':'numeric','extractionFn':{'type':'timeFormat',"
+                + "'format':'M','timeZone':'UTC','locale':'en-US'}}]}]},"
+                + "'aggregations':[],"
+                + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"));
+  }
+
+  @Test public void testPushOfOrderByWithMonthExtract() {
+    String sqlQuery = "SELECT  extract(month from \"timestamp\") as m , \"product_id\", SUM"
+        + "(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY extract(month from \"timestamp\"), \"product_id\" order by m, s, "
+        + "\"product_id\"";
+    sql(sqlQuery).queryContains(
+        druidChecker("{'queryType':'groupBy','dataSource':'foodmart',"
+            + "'granularity':'all','dimensions':[{'type':'extraction',"
+            + "'dimension':'__time','outputName':'extract_month',"
+            + "'extractionFn':{'type':'timeFormat','format':'M','timeZone':'UTC',"
+            + "'locale':'en-US'}},{'type':'default','dimension':'product_id','outputName':"
+            + "'product_id','outputType':'STRING'}],"
+            + "'limitSpec':{'type':'default','columns':[{'dimension':'extract_month',"
+            + "'direction':'ascending','dimensionOrder':'numeric'},{'dimension':'S',"
+            + "'direction':'ascending','dimensionOrder':'numeric'},"
+            + "{'dimension':'product_id','direction':'ascending',"
+            + "'dimensionOrder':'lexicographic'}]},'filter':{'type':'bound',"
+            + "'dimension':'product_id','lower':'1558','lowerStrict':false,"
+            + "'ordering':'numeric'},'aggregations':[{'type':'longSum','name':'S',"
+            + "'fieldName':'unit_sales'}],"
+            + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], "
+            + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+            + "filter=[>=(CAST($1):BIGINT, 1558)], projects=[[EXTRACT(FLAG(MONTH), $0), $1, $89]], "
+            + "groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[0], sort1=[2], sort2=[1], "
+            + "dir0=[ASC], dir1=[ASC], dir2=[ASC])");
+  }
+
+
+  @Test public void testGroupByFloorTimeWithoutLimit() {
+    final String sql = "select floor(\"timestamp\" to MONTH) as \"month\"\n"
+        + "from \"foodmart\"\n"
+        + "group by floor(\"timestamp\" to MONTH)\n"
+        + "order by \"month\" DESC";
+    sql(sql)
+        .queryContains(druidChecker("'queryType':'timeseries'", "'descending':true"))
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z"
+            + "/2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, FLAG(MONTH))]], groups=[{0}], "
+            + "aggs=[[]], sort0=[0], dir0=[DESC])");
+
+  }
+
+  @Test public void testGroupByFloorTimeWithLimit() {
+    final String sql =
+        "select floor(\"timestamp\" to MONTH) as \"floorOfMonth\"\n"
+            + "from \"foodmart\"\n"
+            + "group by floor(\"timestamp\" to MONTH)\n"
+            + "order by \"floorOfMonth\" DESC LIMIT 3";
+    final String explain =
+        "PLAN=EnumerableInterpreter\n"
+            + "  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+            + "2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, FLAG(MONTH))]], groups=[{0}], "
+            + "aggs=[[]], sort0=[0], dir0=[DESC], fetch=[3])";
+    sql(sql)
+        .explainContains(explain)
+        .returnsOrdered("floorOfMonth=1997-12-01 00:00:00", "floorOfMonth=1997-11-01 00:00:00",
+            "floorOfMonth=1997-10-01 00:00:00")
+        .queryContains(druidChecker("'queryType':'groupBy'", "'direction':'descending'"));
+  }
+
+  @Test public void testPushofOrderByYearWithYearMonthExtract() {
+    String sqlQuery = "SELECT year(\"timestamp\") as y, extract(month from \"timestamp\") as m , "
+        + "\"product_id\", SUM"
+        + "(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY year(\"timestamp\"), extract(month from \"timestamp\"), \"product_id\" order"
+        + " by y DESC, m ASC, s DESC, \"product_id\" LIMIT 3";
+    final String expectedPlan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+        + "filter=[>=(CAST($1):BIGINT, 1558)], projects=[[EXTRACT(FLAG(YEAR), $0), "
+        + "EXTRACT(FLAG(MONTH), $0), $1, $89]], groups=[{0, 1, 2}], aggs=[[SUM($3)]], sort0=[0], "
+        + "sort1=[1], sort2=[3], sort3=[2], dir0=[DESC], "
+        + "dir1=[ASC], dir2=[DESC], dir3=[ASC], fetch=[3])";
+    final String expectedDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract_year',"
+        + "'extractionFn':{'type':'timeFormat','format':'yyyy','timeZone':'UTC',"
+        + "'locale':'en-US'}},{'type':'extraction','dimension':'__time',"
+        + "'outputName':'extract_month','extractionFn':{'type':'timeFormat',"
+        + "'format':'M','timeZone':'UTC','locale':'en-US'}},{'type':'default',"
+        + "'dimension':'product_id','outputName':'product_id','outputType':'STRING'}],"
+        + "'limitSpec':{'type':'default','limit':3,"
+        + "'columns':[{'dimension':'extract_year','direction':'descending',"
+        + "'dimensionOrder':'numeric'},{'dimension':'extract_month',"
+        + "'direction':'ascending','dimensionOrder':'numeric'},{'dimension':'S',"
+        + "'direction':'descending','dimensionOrder':'numeric'},"
+        + "{'dimension':'product_id','direction':'ascending',"
+        + "'dimensionOrder':'lexicographic'}]},'filter':{'type':'bound',"
+        + "'dimension':'product_id','lower':'1558','lowerStrict':false,"
+        + "'ordering':'numeric'},'aggregations':[{'type':'longSum','name':'S',"
+        + "'fieldName':'unit_sales'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sqlQuery).explainContains(expectedPlan).queryContains(druidChecker(expectedDruidQuery))
+        .returnsOrdered("Y=1997; M=1; product_id=1558; S=6", "Y=1997; M=1; product_id=1559; S=6",
+            "Y=1997; M=2; product_id=1558; S=24");
+  }
+
+  @Test public void testPushofOrderByMetricWithYearMonthExtract() {
+    String sqlQuery = "SELECT year(\"timestamp\") as y, extract(month from \"timestamp\") as m , "
+        + "\"product_id\", SUM(\"unit_sales\") as s FROM \"foodmart\""
+        + " WHERE \"product_id\" >= 1558"
+        + " GROUP BY year(\"timestamp\"), extract(month from \"timestamp\"), \"product_id\" order"
+        + " by s DESC, m DESC, \"product_id\" LIMIT 3";
+    final String expectedPlan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+        + "filter=[>=(CAST($1):BIGINT, 1558)], projects=[[EXTRACT(FLAG(YEAR), $0), "
+        + "EXTRACT(FLAG(MONTH), $0), $1, $89]], groups=[{0, 1, 2}], aggs=[[SUM($3)]], "
+        + "sort0=[3], sort1=[1], sort2=[2], dir0=[DESC], dir1=[DESC], dir2=[ASC], fetch=[3])";
+    final String expectedDruidQueryType = "'queryType':'groupBy'";
+    sql(sqlQuery)
+        .returnsOrdered("Y=1997; M=12; product_id=1558; S=30", "Y=1997; M=3; product_id=1558; S=29",
+            "Y=1997; M=5; product_id=1558; S=27")
+        .explainContains(expectedPlan)
+        .queryContains(druidChecker(expectedDruidQueryType));
+  }
+
+  @Test public void testGroupByTimeSortOverMetrics() {
+    final String sqlQuery = "SELECT count(*) as c , SUM(\"unit_sales\") as s,"
+        + " floor(\"timestamp\" to month)"
+        + " FROM \"foodmart\" group by floor(\"timestamp\" to month) order by s DESC";
+    sql(sqlQuery)
+        .returnsOrdered("C=8716; S=26796; EXPR$2=1997-12-01 00:00:00",
+            "C=8231; S=25270; EXPR$2=1997-11-01 00:00:00",
+            "C=7752; S=23763; EXPR$2=1997-07-01 00:00:00",
+            "C=7710; S=23706; EXPR$2=1997-03-01 00:00:00",
+            "C=7038; S=21697; EXPR$2=1997-08-01 00:00:00",
+            "C=7033; S=21628; EXPR$2=1997-01-01 00:00:00",
+            "C=6912; S=21350; EXPR$2=1997-06-01 00:00:00",
+            "C=6865; S=21081; EXPR$2=1997-05-01 00:00:00",
+            "C=6844; S=20957; EXPR$2=1997-02-01 00:00:00",
+            "C=6662; S=20388; EXPR$2=1997-09-01 00:00:00",
+            "C=6588; S=20179; EXPR$2=1997-04-01 00:00:00",
+            "C=6478; S=19958; EXPR$2=1997-10-01 00:00:00")
+        .queryContains(druidChecker("'queryType':'groupBy'"))
+        .explainContains("DruidQuery(table=[[foodmart, foodmart]],"
+            + " intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]],"
+            + " projects=[[FLOOR($0, FLAG(MONTH)), $89]], groups=[{0}], "
+            + "aggs=[[COUNT(), SUM($1)]], sort0=[2], dir0=[DESC])");
+  }
+
+  @Test public void testNumericOrderingOfOrderByOperatorFullTime() {
+    final String sqlQuery = "SELECT \"timestamp\" as \"timestamp\","
+        + " count(*) as c, SUM(\"unit_sales\") as s FROM "
+        + "\"foodmart\" group by \"timestamp\" order by \"timestamp\" DESC, c DESC, s LIMIT 5";
+    final String druidSubQuery = "'limitSpec':{'type':'default','limit':5,"
+        + "'columns':[{'dimension':'extract','direction':'descending',"
+        + "'dimensionOrder':'lexicographic'},{'dimension':'C',"
+        + "'direction':'descending','dimensionOrder':'numeric'},{'dimension':'S',"
+        + "'direction':'ascending','dimensionOrder':'numeric'}]},"
+        + "'aggregations':[{'type':'count','name':'C'},{'type':'longSum',"
+        + "'name':'S','fieldName':'unit_sales'}]";
+    sql(sqlQuery).returnsOrdered("timestamp=1997-12-30 00:00:00; C=22; S=36\ntimestamp=1997-12-29"
+        + " 00:00:00; C=321; S=982\ntimestamp=1997-12-28 00:00:00; C=480; "
+        + "S=1496\ntimestamp=1997-12-27 00:00:00; C=363; S=1156\ntimestamp=1997-12-26 00:00:00; "
+        + "C=144; S=420").queryContains(druidChecker(druidSubQuery));
+
+  }
+
+  @Test public void testNumericOrderingOfOrderByOperatorTimeExtract() {
+    final String sqlQuery = "SELECT extract(day from \"timestamp\") as d, extract(month from "
+        + "\"timestamp\") as m,  year(\"timestamp\") as y , count(*) as c, SUM(\"unit_sales\")  "
+        + "as s FROM "
+        + "\"foodmart\" group by  extract(day from \"timestamp\"), extract(month from \"timestamp\"), "
+        + "year(\"timestamp\")  order by d DESC, m ASC, y DESC LIMIT 5";
+    final String druidSubQuery = "'limitSpec':{'type':'default','limit':5,"
+        + "'columns':[{'dimension':'extract_day','direction':'descending',"
+        + "'dimensionOrder':'numeric'},{'dimension':'extract_month',"
+        + "'direction':'ascending','dimensionOrder':'numeric'},"
+        + "{'dimension':'extract_year','direction':'descending',"
+        + "'dimensionOrder':'numeric'}]}";
+    sql(sqlQuery).returnsOrdered("D=30; M=3; Y=1997; C=114; S=351\nD=30; M=5; Y=1997; "
+        + "C=24; S=34\nD=30; M=6; Y=1997; C=73; S=183\nD=30; M=7; Y=1997; C=29; S=54\nD=30; M=8; "
+        + "Y=1997; C=137; S=422").queryContains(druidChecker(druidSubQuery));
+
+  }
+
+  @Test public void testNumericOrderingOfOrderByOperatorStringDims() {
+    final String sqlQuery = "SELECT \"brand_name\", count(*) as c, SUM(\"unit_sales\")  "
+        + "as s FROM "
+        + "\"foodmart\" group by \"brand_name\" order by \"brand_name\"  DESC LIMIT 5";
+    final String druidSubQuery = "'limitSpec':{'type':'default','limit':5,"
+        + "'columns':[{'dimension':'brand_name','direction':'descending',"
+        + "'dimensionOrder':'lexicographic'}]}";
+    sql(sqlQuery).returnsOrdered("brand_name=Washington; C=576; S=1775\nbrand_name=Walrus; C=457;"
+        + " S=1399\nbrand_name=Urban; C=299; S=924\nbrand_name=Tri-State; C=2339; "
+        + "S=7270\nbrand_name=Toucan; C=123; S=380").queryContains(druidChecker(druidSubQuery));
+
+  }
+
+  @Test public void testGroupByWeekExtract() {
+    final String sql = "SELECT extract(week from \"timestamp\") from \"foodmart\" where "
+        + "\"product_id\" = 1558 and extract(week from \"timestamp\") IN (10, 11) group by extract"
+        + "(week from \"timestamp\")";
+
+    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+        + "'granularity':'all','dimensions':[{'type':'extraction',"
+        + "'dimension':'__time','outputName':'extract_week',"
+        + "'extractionFn':{'type':'timeFormat','format':'w','timeZone':'UTC',"
+        + "'locale':'en-US'}}],'limitSpec':{'type':'default'},"
+        + "'filter':{'type':'and','fields':[{'type':'bound','dimension':'product_id',"
+        + "'lower':'1558','lowerStrict':false,'upper':'1558','upperStrict':false,"
+        + "'ordering':'numeric'},{'type':'or',"
+        + "'fields':[{'type':'bound','dimension':'__time','lower':'10','lowerStrict':false,"
+        + "'upper':'10','upperStrict':false,'ordering':'numeric',"
+        + "'extractionFn':{'type':'timeFormat','format':'w','timeZone':'UTC',"
+        + "'locale':'en-US'}},{'type':'bound','dimension':'__time','lower':'11','lowerStrict':false,"
+        + "'upper':'11','upperStrict':false,'ordering':'numeric',"
+        + "'extractionFn':{'type':'timeFormat','format':'w',"
+        + "'timeZone':'UTC','locale':'en-US'}}]}]},"
+        + "'aggregations':[],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
+    sql(sql).returnsOrdered("EXPR$0=10\nEXPR$0=11").queryContains(druidChecker(druidQuery));
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1765">[CALCITE-1765]
+   * Druid adapter: Gracefully handle granularity that cannot be pushed to
+   * extraction function</a>. */
+  @Test public void testTimeExtractThatCannotBePushed() {
+    final String sql = "SELECT extract(

<TRUNCATED>

[3/3] calcite git commit: [CALCITE-2286] Support timestamp type for Druid adapter

Posted by jc...@apache.org.
[CALCITE-2286] Support timestamp type for Druid adapter

Close apache/calcite#681


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/b29397d9
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/b29397d9
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/b29397d9

Branch: refs/heads/master
Commit: b29397d92e16f8aa77d5e84cf7a21744cd1b19a9
Parents: c12cb4b
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Apr 26 16:29:15 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Fri Jun 22 11:49:06 2018 -0700

----------------------------------------------------------------------
 .../adapter/druid/CeilOperatorConversion.java   |   13 +-
 .../adapter/druid/DruidDateTimeUtils.java       |  118 +-
 .../calcite/adapter/druid/DruidExpressions.java |    8 +-
 .../calcite/adapter/druid/DruidJsonFilter.java  |   34 +-
 .../calcite/adapter/druid/DruidQuery.java       |   60 +-
 .../calcite/adapter/druid/DruidRules.java       |    4 +-
 .../adapter/druid/DruidSqlCastConverter.java    |   37 +-
 .../adapter/druid/DruidTableFactory.java        |   27 +-
 .../druid/ExtractOperatorConversion.java        |    9 +-
 .../adapter/druid/FloorOperatorConversion.java  |   12 +-
 .../adapter/druid/TimeExtractionFunction.java   |   40 +-
 .../org/apache/calcite/test/DruidAdapterIT.java |  107 +-
 .../apache/calcite/test/DruidAdapterIT2.java    | 3979 ++++++++++++++++++
 .../calcite/test/DruidDateRangeRulesTest.java   |    4 +-
 .../druid-foodmart-model-timestamp.json         |  153 +
 .../test/resources/druid-foodmart-model.json    |    5 +-
 site/_docs/druid_adapter.md                     |    5 +-
 17 files changed, 4471 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
index 7f15307..30c01e5 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -23,6 +24,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 import java.util.TimeZone;
 
@@ -63,11 +65,14 @@ public class CeilOperatorConversion implements DruidSqlOperatorConverter {
       if (isoPeriodFormat == null) {
         return null;
       }
+      final TimeZone tz;
+      if (arg.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+        tz = TimeZone.getTimeZone(query.getConnectionConfig().timeZone());
+      } else {
+        tz = DateTimeUtils.UTC_ZONE;
+      }
       return DruidExpressions.applyTimestampCeil(
-          druidExpression,
-          isoPeriodFormat,
-          "",
-          TimeZone.getTimeZone(query.getConnectionConfig().timeZone()));
+          druidExpression, isoPeriodFormat, "", tz);
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 91f5fa4..20da334 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.calcite.adapter.druid;
 
-import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -27,7 +26,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.DateString;
 import org.apache.calcite.util.TimestampString;
-import org.apache.calcite.util.TimestampWithTimeZoneString;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
@@ -45,7 +43,6 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.TimeZone;
 
 import javax.annotation.Nullable;
 
@@ -66,9 +63,9 @@ public class DruidDateTimeUtils {
    * reference a single column: the timestamp column.
    */
   @Nullable
-  public static List<Interval> createInterval(RexNode e, String timeZone) {
-    final List<Range<TimestampString>> ranges =
-        extractRanges(e, TimeZone.getTimeZone(timeZone), false);
+  public static List<Interval> createInterval(RexNode e) {
+    final List<Range<Long>> ranges =
+        extractRanges(e, false);
     if (ranges == null) {
       // We did not succeed, bail out
       return null;
@@ -85,18 +82,18 @@ public class DruidDateTimeUtils {
   }
 
   protected static List<Interval> toInterval(
-      List<Range<TimestampString>> ranges) {
+      List<Range<Long>> ranges) {
     List<Interval> intervals = Lists.transform(ranges,
-        new Function<Range<TimestampString>, Interval>() {
-          public Interval apply(Range<TimestampString> range) {
+        new Function<Range<Long>, Interval>() {
+          public Interval apply(Range<Long> range) {
             if (!range.hasLowerBound() && !range.hasUpperBound()) {
               return DruidTable.DEFAULT_INTERVAL;
             }
             long start = range.hasLowerBound()
-                ? range.lowerEndpoint().getMillisSinceEpoch()
+                ? range.lowerEndpoint().longValue()
                 : DruidTable.DEFAULT_INTERVAL.getStartMillis();
             long end = range.hasUpperBound()
-                ? range.upperEndpoint().getMillisSinceEpoch()
+                ? range.upperEndpoint().longValue()
                 : DruidTable.DEFAULT_INTERVAL.getEndMillis();
             if (range.hasLowerBound()
                 && range.lowerBoundType() == BoundType.OPEN) {
@@ -116,8 +113,7 @@ public class DruidDateTimeUtils {
   }
 
   @Nullable
-  protected static List<Range<TimestampString>> extractRanges(RexNode node,
-      TimeZone timeZone, boolean withNot) {
+  protected static List<Range<Long>> extractRanges(RexNode node, boolean withNot) {
     switch (node.getKind()) {
     case EQUALS:
     case LESS_THAN:
@@ -126,17 +122,17 @@ public class DruidDateTimeUtils {
     case GREATER_THAN_OR_EQUAL:
     case BETWEEN:
     case IN:
-      return leafToRanges((RexCall) node, timeZone, withNot);
+      return leafToRanges((RexCall) node, withNot);
 
     case NOT:
-      return extractRanges(((RexCall) node).getOperands().get(0), timeZone, !withNot);
+      return extractRanges(((RexCall) node).getOperands().get(0), !withNot);
 
     case OR: {
       RexCall call = (RexCall) node;
-      List<Range<TimestampString>> intervals = Lists.newArrayList();
+      List<Range<Long>> intervals = Lists.newArrayList();
       for (RexNode child : call.getOperands()) {
-        List<Range<TimestampString>> extracted =
-            extractRanges(child, timeZone, withNot);
+        List<Range<Long>> extracted =
+            extractRanges(child, withNot);
         if (extracted != null) {
           intervals.addAll(extracted);
         }
@@ -146,10 +142,10 @@ public class DruidDateTimeUtils {
 
     case AND: {
       RexCall call = (RexCall) node;
-      List<Range<TimestampString>> ranges = new ArrayList<>();
+      List<Range<Long>> ranges = new ArrayList<>();
       for (RexNode child : call.getOperands()) {
-        List<Range<TimestampString>> extractedRanges =
-            extractRanges(child, timeZone, false);
+        List<Range<Long>> extractedRanges =
+            extractRanges(child, false);
         if (extractedRanges == null || extractedRanges.isEmpty()) {
           // We could not extract, we bail out
           return null;
@@ -158,7 +154,7 @@ public class DruidDateTimeUtils {
           ranges.addAll(extractedRanges);
           continue;
         }
-        List<Range<TimestampString>> overlapped = new ArrayList<>();
+        List<Range<Long>> overlapped = new ArrayList<>();
         for (Range current : ranges) {
           for (Range interval : extractedRanges) {
             if (current.isConnected(interval)) {
@@ -177,8 +173,7 @@ public class DruidDateTimeUtils {
   }
 
   @Nullable
-  protected static List<Range<TimestampString>> leafToRanges(RexCall call,
-      TimeZone timeZone, boolean withNot) {
+  protected static List<Range<Long>> leafToRanges(RexCall call, boolean withNot) {
     switch (call.getKind()) {
     case EQUALS:
     case LESS_THAN:
@@ -186,13 +181,13 @@ public class DruidDateTimeUtils {
     case GREATER_THAN:
     case GREATER_THAN_OR_EQUAL:
     {
-      final TimestampString value;
+      final Long value;
       if (call.getOperands().get(0) instanceof RexInputRef
-          && literalValue(call.getOperands().get(1), timeZone) != null) {
-        value = literalValue(call.getOperands().get(1), timeZone);
+          && literalValue(call.getOperands().get(1)) != null) {
+        value = literalValue(call.getOperands().get(1));
       } else if (call.getOperands().get(1) instanceof RexInputRef
-          && literalValue(call.getOperands().get(0), timeZone) != null) {
-        value = literalValue(call.getOperands().get(0), timeZone);
+          && literalValue(call.getOperands().get(0)) != null) {
+        value = literalValue(call.getOperands().get(0));
       } else {
         return null;
       }
@@ -214,12 +209,12 @@ public class DruidDateTimeUtils {
     }
     case BETWEEN:
     {
-      final TimestampString value1;
-      final TimestampString value2;
-      if (literalValue(call.getOperands().get(2), timeZone) != null
-          && literalValue(call.getOperands().get(3), timeZone) != null) {
-        value1 = literalValue(call.getOperands().get(2), timeZone);
-        value2 = literalValue(call.getOperands().get(3), timeZone);
+      final Long value1;
+      final Long value2;
+      if (literalValue(call.getOperands().get(2)) != null
+          && literalValue(call.getOperands().get(3)) != null) {
+        value1 = literalValue(call.getOperands().get(2));
+        value2 = literalValue(call.getOperands().get(3));
       } else {
         return null;
       }
@@ -234,10 +229,10 @@ public class DruidDateTimeUtils {
     }
     case IN:
     {
-      ImmutableList.Builder<Range<TimestampString>> ranges =
+      ImmutableList.Builder<Range<Long>> ranges =
           ImmutableList.builder();
       for (RexNode operand : Util.skip(call.operands)) {
-        final TimestampString element = literalValue(operand, timeZone);
+        final Long element = literalValue(operand);
         if (element == null) {
           return null;
         }
@@ -255,25 +250,29 @@ public class DruidDateTimeUtils {
     }
   }
 
+  /**
+   * Returns the literal value for the given node, assuming it is a literal with
+   * datetime type, or a cast that only alters nullability on top of a literal with
+   * datetime type.
+   */
   @Nullable
-  protected static TimestampString literalValue(RexNode node, TimeZone timeZone) {
+  protected static Long literalValue(RexNode node) {
     switch (node.getKind()) {
     case LITERAL:
       switch (((RexLiteral) node).getTypeName()) {
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return ((RexLiteral) node).getValueAs(TimestampString.class);
       case TIMESTAMP:
-        // Cast timestamp to timestamp with local time zone
-        final TimestampString t = ((RexLiteral) node).getValueAs(TimestampString.class);
-        return new TimestampWithTimeZoneString(t.toString() + " " + timeZone.getID())
-            .withTimeZone(DateTimeUtils.UTC_ZONE).getLocalTimestampString();
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        TimestampString tsVal = ((RexLiteral) node).getValueAs(TimestampString.class);
+        if (tsVal == null) {
+          return null;
+        }
+        return tsVal.getMillisSinceEpoch();
       case DATE:
-        // Cast date to timestamp with local time zone
-        final DateString d = ((RexLiteral) node).getValueAs(DateString.class);
-        return new TimestampWithTimeZoneString(
-            TimestampString.fromMillisSinceEpoch(
-                d.getMillisSinceEpoch()).toString() + " " + timeZone.getID())
-            .withTimeZone(DateTimeUtils.UTC_ZONE).getLocalTimestampString();
+        DateString dateVal = ((RexLiteral) node).getValueAs(DateString.class);
+        if (dateVal == null) {
+          return null;
+        }
+        return dateVal.getMillisSinceEpoch();
       }
       break;
     case CAST:
@@ -293,7 +292,7 @@ public class DruidDateTimeUtils {
               || callType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
           && callType.isNullable()
           && !operandType.isNullable()) {
-        return literalValue(operand, timeZone);
+        return literalValue(operand);
       }
     }
     return null;
@@ -308,21 +307,36 @@ public class DruidDateTimeUtils {
    * @param node the Rex node
    * @return the granularity, or null if it cannot be inferred
    */
+  @Nullable
   public static Granularity extractGranularity(RexNode node, String timeZone) {
+    final int valueIndex;
     final int flagIndex;
 
     if (TimeExtractionFunction.isValidTimeExtract(node)) {
       flagIndex = 0;
+      valueIndex = 1;
     } else if (TimeExtractionFunction.isValidTimeFloor(node)) {
+      valueIndex = 0;
       flagIndex = 1;
     } else {
       // We can only infer granularity from floor and extract.
       return null;
     }
     final RexCall call = (RexCall) node;
+    final RexNode value = call.operands.get(valueIndex);
     final RexLiteral flag = (RexLiteral) call.operands.get(flagIndex);
     final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
-    return Granularities.createGranularity(timeUnit, timeZone);
+
+    final RelDataType valueType = value.getType();
+    if (valueType.getSqlTypeName() == SqlTypeName.DATE
+        || valueType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
+      // We use 'UTC' for date/timestamp type as Druid needs timezone information
+      return Granularities.createGranularity(timeUnit, "UTC");
+    } else if (valueType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+      return Granularities.createGranularity(timeUnit, timeZone);
+    }
+    // Type not recognized
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
index 0e1965c..bae6a4d 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
@@ -146,11 +146,11 @@ public class DruidExpressions {
       } else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) {
         return
             DruidExpressions.stringLiteral(RexLiteral.stringValue(rexNode));
-      } else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName
+      } else if (SqlTypeName.DATE == sqlTypeName
+          || SqlTypeName.TIMESTAMP == sqlTypeName
           || SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE == sqlTypeName) {
-        return DruidExpressions.numberLiteral(DruidDateTimeUtils
-            .literalValue(rexNode, TimeZone.getTimeZone(druidRel.getConnectionConfig().timeZone()))
-            .getMillisSinceEpoch());
+        return DruidExpressions.numberLiteral(
+            DruidDateTimeUtils.literalValue(rexNode));
       } else if (SqlTypeName.BOOLEAN == sqlTypeName) {
         return DruidExpressions.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0);
       }

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
index 11ec2be..cca9d6b 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
@@ -24,7 +25,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.TimestampString;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.annotations.VisibleForTesting;
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Locale;
-import java.util.TimeZone;
 
 import javax.annotation.Nullable;
 
@@ -47,6 +46,9 @@ import javax.annotation.Nullable;
  */
 abstract class DruidJsonFilter implements DruidJson {
 
+  private static final SimpleDateFormat DATE_FORMATTER = getDateFormatter();
+
+
   /**
    * @param rexNode    rexNode to translate to Druid Json Filter
    * @param rowType    rowType associated to rexNode
@@ -206,30 +208,22 @@ abstract class DruidJsonFilter implements DruidJson {
   @Nullable
   private static String toDruidLiteral(RexNode rexNode, RelDataType rowType,
       DruidQuery druidQuery) {
-    final SimpleDateFormat dateFormatter = new SimpleDateFormat(
-        TimeExtractionFunction.ISO_TIME_FORMAT,
-        Locale.ROOT);
-    final String timeZone = druidQuery.getConnectionConfig().timeZone();
-    if (timeZone != null) {
-      dateFormatter.setTimeZone(TimeZone.getTimeZone(timeZone));
-    }
     final String val;
     final RexLiteral rhsLiteral = (RexLiteral) rexNode;
     if (SqlTypeName.NUMERIC_TYPES.contains(rhsLiteral.getTypeName())) {
       val = String.valueOf(RexLiteral.value(rhsLiteral));
     } else if (SqlTypeName.CHAR_TYPES.contains(rhsLiteral.getTypeName())) {
       val = String.valueOf(RexLiteral.stringValue(rhsLiteral));
-    } else if (SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName() || SqlTypeName.DATE == rhsLiteral
-        .getTypeName() || SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE == rhsLiteral.getTypeName()) {
-      TimestampString timestampString = DruidDateTimeUtils
-          .literalValue(rexNode, TimeZone.getTimeZone(timeZone));
-      if (timestampString == null) {
+    } else if (SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE == rhsLiteral.getTypeName()
+        || SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName()
+        || SqlTypeName.DATE == rhsLiteral.getTypeName()) {
+      Long millisSinceEpoch = DruidDateTimeUtils.literalValue(rexNode);
+      if (millisSinceEpoch == null) {
         throw new AssertionError(
             "Cannot translate Literal" + rexNode + " of type "
                 + rhsLiteral.getTypeName() + " to TimestampString");
       }
-      //@TODO this is unnecessary we can send time as Long (eg millis since epoch) to druid
-      val = dateFormatter.format(timestampString.getMillisSinceEpoch());
+      val = DATE_FORMATTER.format(millisSinceEpoch);
     } else {
       // Don't know how to filter on this kind of literal.
       val = null;
@@ -637,6 +631,14 @@ abstract class DruidJsonFilter implements DruidJson {
       generator.writeEndObject();
     }
   }
+
+  private static SimpleDateFormat getDateFormatter() {
+    final SimpleDateFormat dateFormatter = new SimpleDateFormat(
+        TimeExtractionFunction.ISO_TIME_FORMAT,
+        Locale.ROOT);
+    dateFormatter.setTimeZone(DateTimeUtils.UTC_ZONE);
+    return dateFormatter;
+  }
 }
 
 // End DruidJsonFilter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index d926060..611c722 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -18,6 +18,7 @@ package org.apache.calcite.adapter.druid;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.interpreter.BindableRel;
 import org.apache.calcite.interpreter.Bindables;
@@ -259,11 +260,13 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     switch (rexNode.getKind()) {
     case INPUT_REF:
       columnName = extractColumnName(rexNode, rowType, druidQuery);
-      //@TODO we can remove this ugly check by treating druid time columns as LONG
-      if (rexNode.getType().getFamily() == SqlTypeFamily.DATE
-          || rexNode.getType().getFamily() == SqlTypeFamily.TIMESTAMP) {
-        extractionFunction = TimeExtractionFunction
-            .createDefault(druidQuery.getConnectionConfig().timeZone());
+      if (rexNode.getType().getSqlTypeName() == SqlTypeName.DATE
+          || rexNode.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP
+          || rexNode.getType().getSqlTypeName()
+          == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+        // Use UTC for DATE and TIMESTAMP types
+        extractionFunction = TimeExtractionFunction.createDefault(
+            DateTimeUtils.UTC_ZONE.getID());
       } else {
         extractionFunction = null;
       }
@@ -278,12 +281,24 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       if (!TimeExtractionFunction.isValidTimeExtract((RexCall) rexNode)) {
         return Pair.of(null, null);
       }
-      extractionFunction =
-          TimeExtractionFunction.createExtractFromGranularity(granularity,
-              druidQuery.getConnectionConfig().timeZone());
-      columnName =
-          extractColumnName(((RexCall) rexNode).getOperands().get(1), rowType, druidQuery);
-
+      RexNode extractValueNode = ((RexCall) rexNode).getOperands().get(1);
+      if (extractValueNode.getType().getSqlTypeName() == SqlTypeName.DATE
+          || extractValueNode.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) {
+        // Use 'UTC' at the extraction level
+        extractionFunction =
+            TimeExtractionFunction.createExtractFromGranularity(
+                granularity, DateTimeUtils.UTC_ZONE.getID());
+        columnName = extractColumnName(extractValueNode, rowType, druidQuery);
+      } else if (extractValueNode.getType().getSqlTypeName()
+          == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+        // Use local time zone at the extraction level
+        extractionFunction =
+          TimeExtractionFunction.createExtractFromGranularity(
+              granularity, druidQuery.getConnectionConfig().timeZone());
+        columnName = extractColumnName(extractValueNode, rowType, druidQuery);
+      } else {
+        return Pair.of(null, null);
+      }
       break;
     case FLOOR:
       granularity = DruidDateTimeUtils
@@ -295,11 +310,20 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       if (!TimeExtractionFunction.isValidTimeFloor((RexCall) rexNode)) {
         return Pair.of(null, null);
       }
-      extractionFunction =
-          TimeExtractionFunction
-              .createFloorFromGranularity(granularity, druidQuery.getConnectionConfig().timeZone());
-      columnName =
-          extractColumnName(((RexCall) rexNode).getOperands().get(0), rowType, druidQuery);
+      RexNode floorValueNode = ((RexCall) rexNode).getOperands().get(0);
+      if (floorValueNode.getType().getSqlTypeName() == SqlTypeName.DATE
+          || floorValueNode.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP
+          || floorValueNode.getType().getSqlTypeName()
+          == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+        // Use 'UTC' at the extraction level, since all datetime types
+        // are represented in 'UTC'
+        extractionFunction =
+            TimeExtractionFunction.createFloorFromGranularity(
+                granularity, DateTimeUtils.UTC_ZONE.getID());
+        columnName = extractColumnName(floorValueNode, rowType, druidQuery);
+      } else {
+        return Pair.of(null, null);
+      }
       break;
     case CAST:
       // CASE we have a cast over InputRef. Check that cast is valid
@@ -310,8 +334,8 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
           extractColumnName(((RexCall) rexNode).getOperands().get(0), rowType, druidQuery);
       // CASE CAST to TIME/DATE need to make sure that we have valid extraction fn
       final SqlTypeName toTypeName = rexNode.getType().getSqlTypeName();
-      if (toTypeName.getFamily() == SqlTypeFamily.TIMESTAMP
-          || toTypeName.getFamily() == SqlTypeFamily.DATETIME) {
+      if (toTypeName == SqlTypeName.DATE || toTypeName == SqlTypeName.TIMESTAMP
+          || toTypeName == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
         extractionFunction = TimeExtractionFunction.translateCastToTimeExtract(rexNode,
             TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone()));
         if (extractionFunction == null) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 7b6bc78..83d4fce 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -196,9 +196,7 @@ public class DruidRules {
             .unwrap(CalciteConnectionConfig.class).timeZone();
         assert timeZone != null;
         intervals = DruidDateTimeUtils.createInterval(
-            RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false),
-
-            query.getConnectionConfig().timeZone());
+            RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false));
         if (intervals == null || intervals.isEmpty()) {
           // Case we have a filter with extract that can not be written as interval push down
           triple.getMiddle().addAll(triple.getLeft());

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
index 0ca6cc7..def28c6 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSqlCastConverter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
@@ -53,16 +54,42 @@ public class DruidSqlCastConverter implements DruidSqlOperatorConverter {
     final SqlTypeName toType = rexNode.getType().getSqlTypeName();
     final String timeZoneConf = druidQuery.getConnectionConfig().timeZone();
     final TimeZone timeZone = TimeZone.getTimeZone(timeZoneConf == null ? "UTC" : timeZoneConf);
+    final boolean nullEqualToEmpty = druidQuery.getConnectionConfig().nullEqualToEmpty();
 
     if (SqlTypeName.CHAR_TYPES.contains(fromType) && SqlTypeName.DATETIME_TYPES.contains(toType)) {
       //case chars to dates
       return castCharToDateTime(timeZone, operandExpression, toType,
-          druidQuery.getConnectionConfig().nullEqualToEmpty() ? "" : null);
+          nullEqualToEmpty ? "" : null);
     } else if (SqlTypeName.DATETIME_TYPES.contains(fromType) && SqlTypeName.CHAR_TYPES.contains
         (toType)) {
       //case dates to chars
-      return castDateTimeToChar(timeZone, operandExpression,
-          fromType);
+      return castDateTimeToChar(timeZone, operandExpression, fromType);
+    } else if (toType == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+        && SqlTypeName.DATETIME_TYPES.contains(fromType)) {
+      if (timeZone.equals(DateTimeUtils.UTC_ZONE)) {
+        // bail out, internal representation is the same,
+        // we do not need to do anything
+        return operandExpression;
+      }
+      // to timestamp with local time zone
+      return castCharToDateTime(
+          timeZone,
+          castDateTimeToChar(DateTimeUtils.UTC_ZONE, operandExpression, fromType),
+          toType,
+          nullEqualToEmpty ? "" : null);
+    } else if (fromType == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+        && SqlTypeName.DATETIME_TYPES.contains(toType)) {
+      if (toType != SqlTypeName.DATE && timeZone.equals(DateTimeUtils.UTC_ZONE)) {
+        // bail out, internal representation is the same,
+        // we do not need to do anything
+        return operandExpression;
+      }
+      // timestamp with local time zone to other types
+      return castCharToDateTime(
+          DateTimeUtils.UTC_ZONE,
+          castDateTimeToChar(timeZone, operandExpression, fromType),
+          toType,
+          nullEqualToEmpty ? "" : null);
     } else {
       // Handle other casts.
       final DruidType fromExprType = DruidExpressions.EXPRESSION_TYPES.get(fromType);
@@ -115,8 +142,8 @@ public class DruidSqlCastConverter implements DruidSqlOperatorConverter {
           Period.days(1).toString(),
           "",
           timeZone);
-    } else if (toType == SqlTypeName.TIMESTAMP || toType == SqlTypeName
-        .TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+    } else if (toType == SqlTypeName.TIMESTAMP
+        || toType == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
       return timestampExpression;
     } else {
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
index add0136..e714609 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
@@ -57,12 +57,33 @@ public class DruidTableFactory implements TableFactory {
     final Map<String, SqlTypeName> fieldBuilder = new LinkedHashMap<>();
     final Map<String, List<ComplexMetric>> complexMetrics = new HashMap<>();
     final String timestampColumnName;
-    if (operand.get("timestampColumn") != null) {
-      timestampColumnName = (String) operand.get("timestampColumn");
+    final SqlTypeName timestampColumnType;
+    final Object timestampInfo = operand.get("timestampColumn");
+    if (timestampInfo != null) {
+      if (timestampInfo instanceof Map) {
+        Map map = (Map) timestampInfo;
+        if (!(map.get("name") instanceof String)) {
+          throw new IllegalArgumentException("timestampColumn array must have name");
+        }
+        timestampColumnName = (String) map.get("name");
+        if (!(map.get("type") instanceof String)
+            || map.get("type").equals("timestamp with local time zone")) {
+          timestampColumnType = SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        } else if (map.get("type").equals("timestamp")) {
+          timestampColumnType = SqlTypeName.TIMESTAMP;
+        } else {
+          throw new IllegalArgumentException("unexpected type for timestampColumn array");
+        }
+      } else {
+        // String (for backwards compatibility)
+        timestampColumnName = (String) timestampInfo;
+        timestampColumnType = SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+      }
     } else {
       timestampColumnName = DruidTable.DEFAULT_TIMESTAMP_COLUMN;
+      timestampColumnType = SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
     }
-    fieldBuilder.put(timestampColumnName, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+    fieldBuilder.put(timestampColumnName, timestampColumnType);
     final Object dimensionsRaw = operand.get("dimensions");
     if (dimensionsRaw instanceof List) {
       // noinspection unchecked

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
index 6e35540..f31b0dc 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/ExtractOperatorConversion.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -23,6 +24,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -72,8 +74,11 @@ public class ExtractOperatorConversion implements DruidSqlOperatorConverter {
       return null;
     }
 
-    return DruidExpressions.applyTimeExtract(
-        input, druidUnit, TimeZone.getTimeZone(query.getConnectionConfig().timeZone()));
+    final TimeZone tz =
+        arg.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+            ? TimeZone.getTimeZone(query.getConnectionConfig().timeZone())
+            : DateTimeUtils.UTC_ZONE;
+    return DruidExpressions.applyTimeExtract(input, druidUnit, tz);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
index 0d8ecc1..187fa66 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/FloorOperatorConversion.java
@@ -16,11 +16,13 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 import java.util.TimeZone;
 
@@ -51,8 +53,14 @@ public class FloorOperatorConversion implements DruidSqlOperatorConverter {
       return  DruidQuery.format("floor(%s)", druidExpression);
     } else if (call.getOperands().size() == 2) {
       // FLOOR(expr TO timeUnit)
+      final TimeZone tz;
+      if (arg.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+        tz = TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone());
+      } else {
+        tz = DateTimeUtils.UTC_ZONE;
+      }
       final Granularity granularity = DruidDateTimeUtils
-          .extractGranularity(call, druidQuery.getConnectionConfig().timeZone());
+          .extractGranularity(call, tz.getID());
       if (granularity == null) {
         return null;
       }
@@ -64,7 +72,7 @@ public class FloorOperatorConversion implements DruidSqlOperatorConverter {
           druidExpression,
           isoPeriodFormat,
           "",
-          TimeZone.getTimeZone(druidQuery.getConnectionConfig().timeZone()));
+          tz);
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
index 552d221..d0c1596 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -16,13 +16,13 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -209,17 +209,37 @@ public class TimeExtractionFunction implements ExtractionFunction {
       // unknown format
       return null;
     }
-    if (rexCall.getType().getFamily() == SqlTypeFamily.DATE) {
-      return new TimeExtractionFunction(castFormat,
-          Granularities.createGranularity(TimeUnitRange.DAY, timeZoneId), timeZoneId,
-          Locale.ENGLISH.toString());
+    SqlTypeName fromType = rexCall.getOperands().get(0).getType().getSqlTypeName();
+    SqlTypeName toType = rexCall.getType().getSqlTypeName();
+    String granularityTZId;
+    switch (fromType) {
+    case DATE:
+    case TIMESTAMP:
+      granularityTZId = DateTimeUtils.UTC_ZONE.getID();
+      break;
+    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      granularityTZId = timeZoneId;
+      break;
+    default:
+      return null;
     }
-    if (rexCall.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP
-        || rexCall.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
-      return new TimeExtractionFunction(castFormat, null, timeZoneId, Locale.ENGLISH.toString());
+    switch (toType) {
+    case DATE:
+      return new TimeExtractionFunction(castFormat,
+          Granularities.createGranularity(TimeUnitRange.DAY, granularityTZId),
+          DateTimeUtils.UTC_ZONE.getID(), Locale.ENGLISH.toString());
+    case TIMESTAMP:
+      // date -> timestamp: UTC
+      // timestamp -> timestamp: UTC
+      // timestamp with local time zone -> granularityTZId
+      return new TimeExtractionFunction(
+          castFormat, null, granularityTZId, Locale.ENGLISH.toString());
+    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      return new TimeExtractionFunction(
+          castFormat, null, DateTimeUtils.UTC_ZONE.getID(), Locale.ENGLISH.toString());
+    default:
+      return null;
     }
-
-    return null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/b29397d9/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 24616b0..2a57303 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -70,6 +70,10 @@ import static org.junit.Assert.assertTrue;
  *   <li>push SORT and/or LIMIT into "groupBy" query</li>
  *   <li>push HAVING into "groupBy" query</li>
  * </ul>
+ *
+ * These tests use "timestamp with local time zone" type for the
+ * Druid timestamp column, instead of "timestamp" type  as
+ * {@link DruidAdapterIT2}.
  */
 public class DruidAdapterIT {
   /** URL of the "druid-foodmart" model. */
@@ -347,8 +351,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{'queryType':'scan',"
         + "'dataSource':'wikiticker',"
         + "'intervals':['1900-01-01T00:00:00.000Z/2015-10-12T00:00:00.000Z'],"
-        + "'virtualColumns':[{'type':'expression','name':'vc','expression':'\\'__time\\'','outputType':'LONG'}],'columns':['vc'],"
-        + "'resultFormat':'compactedList'";
+        + "'virtualColumns':[{'type':'expression','name':'vc','expression':";
     sql(sql, WIKI_AUTO2)
         .limit(2)
         .returnsUnordered("__time=2015-09-12 00:46:58",
@@ -3242,7 +3245,7 @@ public class DruidAdapterIT {
 
     final String druidQuery = "{'queryType':'scan','dataSource':'foodmart','intervals':"
         + "['1997-01-01T00:00:00.000Z/1997-04-01T00:00:00.000Z'],'virtualColumns':"
-        + "[{'type':'expression','name':'vc','expression':'timestamp_floor(\\'__time\\'";
+        + "[{'type':'expression','name':'vc','expression':'timestamp_floor(";
     sql(sql, FOODMART)
         .returnsOrdered("T=1997-01-01 00:00:00", "T=1997-01-01 00:00:00")
         .queryContains(
@@ -3285,36 +3288,61 @@ public class DruidAdapterIT {
         + "\"ordering\":\"lexicographic\",\"extractionFn\":{\"type\":\"timeFormat\","
         + "\"format\":\"yyyy-MM-dd";
     final String druidQueryPart2 = "\"granularity\":{\"type\":\"period\",\"period\":\"PT1H\","
-        + "\"timeZone\":\"IST\"},\"timeZone\":\"IST\","
+        + "\"timeZone\":\"Asia/Kolkata\"},\"timeZone\":\"UTC\","
         + "\"locale\":\"und\"}}";
 
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", WIKI_AUTO2.getPath()))
-        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "IST")
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "Asia/Kolkata")
         .query(sql)
         .runs()
         .queryContains(druidChecker(druidQueryPart1, druidQueryPart2))
-        .returnsOrdered("T=2015-09-12 02:30:02");
+        .returnsOrdered("T=2015-09-12 14:00:01");
   }
 
   @Test public void testTimeWithFilterOnFloorOnTimeWithTimezoneConversion() {
     final String sql = "Select cast(\"__time\" as timestamp) as t, \"countryName\" as s, "
         + "count(*) as c from \"wikiticker\" where floor(\"__time\" to HOUR)"
-        + " >= '2015-09-12 08:00:00 IST' group by cast(\"__time\" as timestamp), \"countryName\""
+        + " >= '2015-09-12 08:00:00 Asia/Kolkata' group by cast(\"__time\" as timestamp), \"countryName\""
         + " order by t limit 4";
     final String druidQueryPart1 = "filter\":{\"type\":\"bound\",\"dimension\":\"__time\","
-        + "\"lower\":\"2015-09-12T08:00:00.000Z\",\"lowerStrict\":false,"
+        + "\"lower\":\"2015-09-12T02:30:00.000Z\",\"lowerStrict\":false,"
+        + "\"ordering\":\"lexicographic\",\"extractionFn\":{\"type\":\"timeFormat\","
+        + "\"format\":\"yyyy-MM-dd";
+    final String druidQueryPart2 = "\"granularity\":{\"type\":\"period\",\"period\":\"PT1H\","
+        + "\"timeZone\":\"Asia/Kolkata\"},\"timeZone\":\"UTC\","
+        + "\"locale\":\"und\"}}";
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", WIKI_AUTO2.getPath()))
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "Asia/Kolkata")
+        .query(sql)
+        .runs()
+        .queryContains(druidChecker(druidQueryPart1, druidQueryPart2))
+        .returnsOrdered("T=2015-09-12 08:00:02; S=null; C=1",
+            "T=2015-09-12 08:00:04; S=null; C=1",
+            "T=2015-09-12 08:00:05; S=null; C=1",
+            "T=2015-09-12 08:00:07; S=null; C=1");
+  }
+
+  @Test public void testTimeWithFilterOnFloorOnTimeWithTimezoneConversionCast() {
+    final String sql = "Select cast(\"__time\" as timestamp) as t, \"countryName\" as s, "
+        + "count(*) as c from \"wikiticker\" where floor(\"__time\" to HOUR)"
+        + " >= '2015-09-12 08:00:00 Asia/Kolkata' group by cast(\"__time\" as timestamp), \"countryName\""
+        + " order by t limit 4";
+    final String druidQueryPart1 = "filter\":{\"type\":\"bound\",\"dimension\":\"__time\","
+        + "\"lower\":\"2015-09-12T02:30:00.000Z\",\"lowerStrict\":false,"
         + "\"ordering\":\"lexicographic\",\"extractionFn\":{\"type\":\"timeFormat\","
         + "\"format\":\"yyyy-MM-dd";
     final String druidQueryPart2 = "\"granularity\":{\"type\":\"period\",\"period\":\"PT1H\","
-        + "\"timeZone\":\"IST\"},\"timeZone\":\"IST\","
+        + "\"timeZone\":\"Asia/Kolkata\"},\"timeZone\":\"UTC\","
         + "\"locale\":\"und\"}}";
 
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", WIKI_AUTO2.getPath()))
-        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "IST")
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "Asia/Kolkata")
         .query(sql)
         .runs()
         .queryContains(druidChecker(druidQueryPart1, druidQueryPart2))
@@ -3343,7 +3371,7 @@ public class DruidAdapterIT {
         + " limit 1";
     final String druidQuery = "{\"queryType\":\"scan\",\"dataSource\":\"foodmart\",\"intervals\":"
         + "[\"1997-04-30T18:30:00.000Z/1997-05-31T18:30:00.000Z\"],\"virtualColumns\":[{\"type\":"
-        + "\"expression\",\"name\":\"vc\",\"expression\":\"timestamp_floor(\\\"__time\\\"";
+        + "\"expression\",\"name\":\"vc\",\"expression\":\"timestamp_parse";
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", FOODMART.getPath()))
@@ -3351,7 +3379,7 @@ public class DruidAdapterIT {
         .query(sql)
         .runs()
         .queryContains(druidChecker(druidQuery))
-        .returnsOrdered("T=1997-04-30 18:30:00");
+        .returnsOrdered("T=1997-05-01 00:00:00");
   }
 
   @Test
@@ -3592,7 +3620,7 @@ public class DruidAdapterIT {
             + "groups=[{}], aggs=[[COUNT()]])")
         .queryContains(
             druidChecker("{\"type\":\"expression\","
-                + "\"expression\":\"(timestamp_format(timestamp_floor(\\\"__time\\\""))
+                + "\"expression\":\"(timestamp_format(timestamp_floor("))
         .returnsUnordered("EXPR$0=117");
   }
 
@@ -3643,9 +3671,9 @@ public class DruidAdapterIT {
 
   @Test
   public void testExtractHourFilterExpression() {
-    final String sql = "SELECT EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) "
+    final String sql = "SELECT EXTRACT(HOUR from \"timestamp\") "
             + "from \"foodmart\" WHERE EXTRACT(HOUR from \"timestamp\") = 17 "
-            + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
+            + "group by EXTRACT(HOUR from \"timestamp\") ";
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", FOODMART.getPath()))
@@ -3654,10 +3682,47 @@ public class DruidAdapterIT {
         .runs()
         .returnsOrdered("EXPR$0=17");
 
-    final String sql2 = "SELECT EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) "
+    final String sql2 = "SELECT EXTRACT(HOUR from \"timestamp\") "
             + "from \"foodmart\" WHERE"
-            + " EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) = 19 "
-            + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
+            + " EXTRACT(HOUR from \"timestamp\") = 19 "
+            + "group by EXTRACT(HOUR from \"timestamp\") ";
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", FOODMART.getPath()))
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "EST")
+        .query(sql2)
+        .runs()
+        .returnsOrdered("EXPR$0=19");
+
+    final String sql3 = "SELECT EXTRACT(HOUR from \"timestamp\") "
+            + "from \"foodmart\" WHERE EXTRACT(HOUR from \"timestamp\") = 0 "
+            + "group by EXTRACT(HOUR from \"timestamp\") ";
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", FOODMART.getPath()))
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "UTC")
+        .query(sql3)
+        .runs()
+        .returnsOrdered("EXPR$0=0");
+  }
+
+  @Test
+  public void testExtractHourFilterExpressionWithCast() {
+    final String sql = "SELECT EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) "
+        + "from \"foodmart\" WHERE EXTRACT(HOUR from \"timestamp\") = 17 "
+        + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", FOODMART.getPath()))
+        .with(CalciteConnectionProperty.TIME_ZONE.camelName(), "America/Los_Angeles")
+        .query(sql)
+        .runs()
+        .returnsOrdered("EXPR$0=17");
+
+    final String sql2 = "SELECT EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) "
+        + "from \"foodmart\" WHERE"
+        + " EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) = 19 "
+        + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", FOODMART.getPath()))
@@ -3667,8 +3732,8 @@ public class DruidAdapterIT {
         .returnsOrdered("EXPR$0=19");
 
     final String sql3 = "SELECT EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) "
-            + "from \"foodmart\" WHERE EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) = 0 "
-            + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
+        + "from \"foodmart\" WHERE EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) = 0 "
+        + "group by EXTRACT(HOUR from CAST(\"timestamp\" AS TIMESTAMP)) ";
     CalciteAssert.that()
         .enable(enabled())
         .with(ImmutableMap.of("model", FOODMART.getPath()))
@@ -3707,7 +3772,7 @@ public class DruidAdapterIT {
         .queryContains(
             druidChecker("\"filter\":{\"type\":\"expression\",\"expression\":\""
                     + "(timestamp_floor(timestamp_parse(concat(concat(",
-                "== timestamp_floor(\\\"__time\\\""));
+                "== timestamp_floor("));
   }
 
   @Test