You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2017/09/07 01:50:47 UTC

[2/4] calcite git commit: [CALCITE-1947] Add time/timestamp with local time zone types to optimizer

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/core/src/test/java/org/apache/calcite/test/RexProgramTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RexProgramTest.java b/core/src/test/java/org/apache/calcite/test/RexProgramTest.java
index 7cbcfa6..ccecc98 100644
--- a/core/src/test/java/org/apache/calcite/test/RexProgramTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RexProgramTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.calcite.test;
 
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.Strong;
 import org.apache.calcite.rel.type.RelDataType;
@@ -27,6 +29,8 @@ import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexExecutor;
+import org.apache.calcite.rex.RexExecutorImpl;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexLocalRef;
@@ -35,6 +39,7 @@ import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
@@ -48,6 +53,7 @@ import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.TestUtil;
 import org.apache.calcite.util.TimeString;
 import org.apache.calcite.util.TimestampString;
+import org.apache.calcite.util.TimestampWithTimeZoneString;
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
@@ -65,6 +71,7 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.TreeMap;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -100,7 +107,9 @@ public class RexProgramTest {
   public void setUp() {
     typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     rexBuilder = new RexBuilder(typeFactory);
-    simplify = new RexSimplify(rexBuilder, false, RexUtil.EXECUTOR);
+    RexExecutor executor =
+        new RexExecutorImpl(new DummyTestDataContext());
+    simplify = new RexSimplify(rexBuilder, false, executor);
     trueLiteral = rexBuilder.makeLiteral(true);
     falseLiteral = rexBuilder.makeLiteral(false);
     final RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER);
@@ -108,6 +117,34 @@ public class RexProgramTest {
     unknownLiteral = rexBuilder.makeNullLiteral(trueLiteral.getType());
   }
 
+  /** Dummy data context for test. */
+  private static class DummyTestDataContext implements DataContext {
+    private final ImmutableMap<String, Object> map;
+
+    DummyTestDataContext() {
+      this.map =
+          ImmutableMap.<String, Object>of(
+              Variable.TIME_ZONE.camelName, TimeZone.getTimeZone("America/Los_Angeles"),
+              Variable.CURRENT_TIMESTAMP.camelName, new Long(1311120000000L));
+    }
+
+    public SchemaPlus getRootSchema() {
+      return null;
+    }
+
+    public JavaTypeFactory getTypeFactory() {
+      return null;
+    }
+
+    public QueryProvider getQueryProvider() {
+      return null;
+    }
+
+    public Object get(String name) {
+      return map.get(name);
+    }
+  }
+
   private void checkCnf(RexNode node, String expected) {
     assertThat(RexUtil.toCnf(rexBuilder, node).toString(), equalTo(expected));
   }
@@ -1491,6 +1528,95 @@ public class RexProgramTest {
         "1970-01-01 00:00:00"); // different from Hive
   }
 
+  @Test public void testSimplifyCastLiteral3() {
+    // Default TimeZone is "America/Los_Angeles" (DummyDataContext)
+    final RexLiteral literalDate = rexBuilder.makeDateLiteral(new DateString("2011-07-20"));
+    final RexLiteral literalTime = rexBuilder.makeTimeLiteral(new TimeString("12:34:56"), 0);
+    final RexLiteral literalTimestamp = rexBuilder.makeTimestampLiteral(
+        new TimestampString("2011-07-20 12:34:56"), 0);
+    final RexLiteral literalTimeLTZ =
+        rexBuilder.makeTimeWithLocalTimeZoneLiteral(
+            new TimeString(1, 23, 45), 0);
+    final RexLiteral timeLTZChar1 = rexBuilder.makeLiteral("12:34:45 America/Los_Angeles");
+    final RexLiteral timeLTZChar2 = rexBuilder.makeLiteral("12:34:45 UTC");
+    final RexLiteral timeLTZChar3 = rexBuilder.makeLiteral("12:34:45 GMT+01");
+    final RexLiteral timestampLTZChar1 = rexBuilder.makeLiteral("2011-07-20 12:34:56 Asia/Tokyo");
+    final RexLiteral timestampLTZChar2 = rexBuilder.makeLiteral("2011-07-20 12:34:56 GMT+01");
+    final RexLiteral timestampLTZChar3 = rexBuilder.makeLiteral("2011-07-20 12:34:56 UTC");
+    final RexLiteral literalTimestampLTZ =
+        rexBuilder.makeTimestampWithLocalTimeZoneLiteral(
+            new TimestampString(2011, 7, 20, 8, 23, 45), 0);
+
+    final RelDataType dateType =
+        typeFactory.createSqlType(SqlTypeName.DATE);
+    final RelDataType timeType =
+        typeFactory.createSqlType(SqlTypeName.TIME);
+    final RelDataType timestampType =
+        typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+    final RelDataType timeLTZType =
+        typeFactory.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
+    final RelDataType timestampLTZType =
+        typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+    final RelDataType varCharType =
+        typeFactory.createSqlType(SqlTypeName.VARCHAR, 40);
+
+    checkSimplify(cast(timeLTZChar1, timeLTZType), "20:34:45");
+    checkSimplify(cast(timeLTZChar2, timeLTZType), "12:34:45");
+    checkSimplify(cast(timeLTZChar3, timeLTZType), "11:34:45");
+    checkSimplify(cast(literalTimeLTZ, timeLTZType), "01:23:45");
+    checkSimplify(cast(timestampLTZChar1, timestampLTZType),
+        "2011-07-20 03:34:56");
+    checkSimplify(cast(timestampLTZChar2, timestampLTZType),
+        "2011-07-20 11:34:56");
+    checkSimplify(cast(timestampLTZChar3, timestampLTZType),
+        "2011-07-20 12:34:56");
+    checkSimplify(cast(literalTimestampLTZ, timestampLTZType),
+        "2011-07-20 08:23:45");
+    checkSimplify(cast(literalDate, timestampLTZType),
+        "2011-07-20 07:00:00");
+    checkSimplify(cast(literalTime, timestampLTZType),
+        "2011-07-20 19:34:56");
+    checkSimplify(cast(literalTimestamp, timestampLTZType),
+        "2011-07-20 19:34:56");
+    checkSimplify(cast(literalTimestamp, dateType),
+        "2011-07-20");
+    checkSimplify(cast(literalTimestampLTZ, dateType),
+        "2011-07-20");
+    checkSimplify(cast(literalTimestampLTZ, timeType),
+        "01:23:45");
+    checkSimplify(cast(literalTimestampLTZ, timestampType),
+        "2011-07-20 01:23:45");
+    checkSimplify(cast(literalTimeLTZ, timeType),
+        "17:23:45");
+    checkSimplify(cast(literalTime, timeLTZType),
+        "20:34:56");
+    checkSimplify(cast(literalTimestampLTZ, timeLTZType),
+        "08:23:45");
+    checkSimplify(cast(literalTimeLTZ, varCharType),
+        "'17:23:45 America/Los_Angeles'");
+    checkSimplify(cast(literalTimestampLTZ, varCharType),
+        "'2011-07-20 01:23:45 America/Los_Angeles'");
+    checkSimplify(cast(literalTimeLTZ, timestampType),
+        "2011-07-19 18:23:45");
+    checkSimplify(cast(literalTimeLTZ, timestampLTZType),
+        "2011-07-20 01:23:45");
+  }
+
+  @Test public void testCompareTimestampWithTimeZone() {
+    final TimestampWithTimeZoneString timestampLTZChar1 =
+        new TimestampWithTimeZoneString("2011-07-20 10:34:56 America/Los_Angeles");
+    final TimestampWithTimeZoneString timestampLTZChar2 =
+        new TimestampWithTimeZoneString("2011-07-20 19:34:56 Europe/Rome");
+    final TimestampWithTimeZoneString timestampLTZChar3 =
+        new TimestampWithTimeZoneString("2011-07-20 01:34:56 Asia/Tokyo");
+    final TimestampWithTimeZoneString timestampLTZChar4 =
+        new TimestampWithTimeZoneString("2011-07-20 10:34:56 America/Los_Angeles");
+
+    assertThat(timestampLTZChar1.equals(timestampLTZChar2), is(false));
+    assertThat(timestampLTZChar1.equals(timestampLTZChar3), is(false));
+    assertThat(timestampLTZChar1.equals(timestampLTZChar4), is(true));
+  }
+
   @Test public void testSimplifyLiterals() {
     final RexLiteral literalAbc = rexBuilder.makeLiteral("abc");
     final RexLiteral literalDef = rexBuilder.makeLiteral("def");

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 3be8d20..1951396 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -512,7 +512,7 @@ class DruidConnectionImpl implements DruidConnection {
               JsonSegmentMetadata.class);
       final List<JsonSegmentMetadata> list = mapper.readValue(in, listType);
       in.close();
-      fieldBuilder.put(timestampColumnName, SqlTypeName.TIMESTAMP);
+      fieldBuilder.put(timestampColumnName, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
       for (JsonSegmentMetadata o : list) {
         for (Map.Entry<String, JsonColumn> entry : o.columns.entrySet()) {
           if (entry.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 0328882..fb69353 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -26,6 +27,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.DateString;
 import org.apache.calcite.util.TimestampString;
+import org.apache.calcite.util.TimestampWithTimeZoneString;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
@@ -40,6 +42,7 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.TimeZone;
 
 /**
  * Utilities for generating intervals from RexNode.
@@ -57,9 +60,9 @@ public class DruidDateTimeUtils {
    * expression. Assumes that all the predicates in the input
    * reference a single column: the timestamp column.
    */
-  public static List<LocalInterval> createInterval(RelDataType type,
-      RexNode e) {
-    final List<Range<TimestampString>> ranges = extractRanges(e, false);
+  public static List<LocalInterval> createInterval(RexNode e, String timeZone) {
+    final List<Range<TimestampString>> ranges =
+        extractRanges(e, TimeZone.getTimeZone(timeZone), false);
     if (ranges == null) {
       // We did not succeed, bail out
       return null;
@@ -71,10 +74,12 @@ public class DruidDateTimeUtils {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Inferred ranges on interval : " + condensedRanges);
     }
-    return toInterval(ImmutableList.<Range>copyOf(condensedRanges.asRanges()));
+    return toInterval(
+        ImmutableList.<Range>copyOf(condensedRanges.asRanges()));
   }
 
-  protected static List<LocalInterval> toInterval(List<Range<TimestampString>> ranges) {
+  protected static List<LocalInterval> toInterval(
+      List<Range<TimestampString>> ranges) {
     List<LocalInterval> intervals = Lists.transform(ranges,
         new Function<Range<TimestampString>, LocalInterval>() {
           public LocalInterval apply(Range<TimestampString> range) {
@@ -105,7 +110,7 @@ public class DruidDateTimeUtils {
   }
 
   protected static List<Range<TimestampString>> extractRanges(RexNode node,
-      boolean withNot) {
+      TimeZone timeZone, boolean withNot) {
     switch (node.getKind()) {
     case EQUALS:
     case LESS_THAN:
@@ -114,16 +119,17 @@ public class DruidDateTimeUtils {
     case GREATER_THAN_OR_EQUAL:
     case BETWEEN:
     case IN:
-      return leafToRanges((RexCall) node, withNot);
+      return leafToRanges((RexCall) node, timeZone, withNot);
 
     case NOT:
-      return extractRanges(((RexCall) node).getOperands().get(0), !withNot);
+      return extractRanges(((RexCall) node).getOperands().get(0), timeZone, !withNot);
 
     case OR: {
       RexCall call = (RexCall) node;
       List<Range<TimestampString>> intervals = Lists.newArrayList();
       for (RexNode child : call.getOperands()) {
-        List<Range<TimestampString>> extracted = extractRanges(child, withNot);
+        List<Range<TimestampString>> extracted =
+            extractRanges(child, timeZone, withNot);
         if (extracted != null) {
           intervals.addAll(extracted);
         }
@@ -135,7 +141,8 @@ public class DruidDateTimeUtils {
       RexCall call = (RexCall) node;
       List<Range<TimestampString>> ranges = new ArrayList<>();
       for (RexNode child : call.getOperands()) {
-        List<Range<TimestampString>> extractedRanges = extractRanges(child, false);
+        List<Range<TimestampString>> extractedRanges =
+            extractRanges(child, timeZone, false);
         if (extractedRanges == null || extractedRanges.isEmpty()) {
           // We could not extract, we bail out
           return null;
@@ -163,7 +170,7 @@ public class DruidDateTimeUtils {
   }
 
   protected static List<Range<TimestampString>> leafToRanges(RexCall call,
-      boolean withNot) {
+      TimeZone timeZone, boolean withNot) {
     switch (call.getKind()) {
     case EQUALS:
     case LESS_THAN:
@@ -173,11 +180,11 @@ public class DruidDateTimeUtils {
     {
       final TimestampString value;
       if (call.getOperands().get(0) instanceof RexInputRef
-          && literalValue(call.getOperands().get(1)) != null) {
-        value = literalValue(call.getOperands().get(1));
+          && literalValue(call.getOperands().get(1), timeZone) != null) {
+        value = literalValue(call.getOperands().get(1), timeZone);
       } else if (call.getOperands().get(1) instanceof RexInputRef
-          && literalValue(call.getOperands().get(0)) != null) {
-        value = literalValue(call.getOperands().get(0));
+          && literalValue(call.getOperands().get(0), timeZone) != null) {
+        value = literalValue(call.getOperands().get(0), timeZone);
       } else {
         return null;
       }
@@ -201,10 +208,10 @@ public class DruidDateTimeUtils {
     {
       final TimestampString value1;
       final TimestampString value2;
-      if (literalValue(call.getOperands().get(2)) != null
-          && literalValue(call.getOperands().get(3)) != null) {
-        value1 = literalValue(call.getOperands().get(2));
-        value2 = literalValue(call.getOperands().get(3));
+      if (literalValue(call.getOperands().get(2), timeZone) != null
+          && literalValue(call.getOperands().get(3), timeZone) != null) {
+        value1 = literalValue(call.getOperands().get(2), timeZone);
+        value2 = literalValue(call.getOperands().get(3), timeZone);
       } else {
         return null;
       }
@@ -219,9 +226,10 @@ public class DruidDateTimeUtils {
     }
     case IN:
     {
-      ImmutableList.Builder<Range<TimestampString>> ranges = ImmutableList.builder();
+      ImmutableList.Builder<Range<TimestampString>> ranges =
+          ImmutableList.builder();
       for (RexNode operand : Util.skip(call.operands)) {
-        final TimestampString element = literalValue(operand);
+        final TimestampString element = literalValue(operand, timeZone);
         if (element == null) {
           return null;
         }
@@ -239,16 +247,24 @@ public class DruidDateTimeUtils {
     }
   }
 
-  private static TimestampString literalValue(RexNode node) {
+  private static TimestampString literalValue(RexNode node, TimeZone timeZone) {
     switch (node.getKind()) {
     case LITERAL:
       switch (((RexLiteral) node).getTypeName()) {
-      case TIMESTAMP:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         return ((RexLiteral) node).getValueAs(TimestampString.class);
+      case TIMESTAMP:
+        // Cast timestamp to timestamp with local time zone
+        final TimestampString t = ((RexLiteral) node).getValueAs(TimestampString.class);
+        return new TimestampWithTimeZoneString(t.toString() + " " + timeZone.getID())
+            .withTimeZone(DateTimeUtils.UTC_ZONE).getLocalTimestampString();
       case DATE:
-        // For uniformity, treat dates as timestamps
+        // Cast date to timestamp with local time zone
         final DateString d = ((RexLiteral) node).getValueAs(DateString.class);
-        return TimestampString.fromMillisSinceEpoch(d.getMillisSinceEpoch());
+        return new TimestampWithTimeZoneString(
+            TimestampString.fromMillisSinceEpoch(
+                d.getMillisSinceEpoch()).toString() + " " + timeZone.getID())
+            .withTimeZone(DateTimeUtils.UTC_ZONE).getLocalTimestampString();
       }
       break;
     case CAST:
@@ -262,11 +278,13 @@ public class DruidDateTimeUtils {
       final RelDataType callType = call.getType();
       final RelDataType operandType = operand.getType();
       if (operand.getKind() == SqlKind.LITERAL
-          && callType.getSqlTypeName() == SqlTypeName.TIMESTAMP
+          && callType.getSqlTypeName() == operandType.getSqlTypeName()
+          && (callType.getSqlTypeName() == SqlTypeName.DATE
+              || callType.getSqlTypeName() == SqlTypeName.TIMESTAMP
+              || callType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
           && callType.isNullable()
-          && operandType.getSqlTypeName() == SqlTypeName.TIMESTAMP
           && !operandType.isNullable()) {
-        return literalValue(operand);
+        return literalValue(operand, timeZone);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index b12ad9a..b4a069b 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -512,7 +512,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       ImmutableBitSet numericCollationIndexes, Integer fetch, Project postProject) {
     final CalciteConnectionConfig config = getConnectionConfig();
     QueryType queryType = QueryType.SELECT;
-    final Translator translator = new Translator(druidTable, rowType);
+    final Translator translator = new Translator(druidTable, rowType, config.timeZone());
     List<String> fieldNames = rowType.getFieldNames();
     Set<String> usedFieldNames = Sets.newHashSet(fieldNames);
 
@@ -564,7 +564,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
               String extractColumnName = SqlValidatorUtil.uniquify(EXTRACT_COLUMN_NAME_PREFIX,
                   usedFieldNames, SqlValidatorUtil.EXPR_SUGGESTER);
               timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
-                  extractColumnName);
+                  extractColumnName, config.timeZone());
               dimensions.add(timeExtractionDimensionSpec);
               builder.add(extractColumnName);
               assert timePositionIdx == -1;
@@ -587,7 +587,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
                         + "_" + funcGranularity.value, usedFieldNames,
                     SqlValidatorUtil.EXPR_SUGGESTER);
                 timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeTimeExtract(
-                    funcGranularity, extractColumnName);
+                    funcGranularity, extractColumnName, config.timeZone());
                 dimensions.add(timeExtractionDimensionSpec);
                 builder.add(extractColumnName);
                 break;
@@ -600,7 +600,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
                       SqlValidatorUtil.EXPR_SUGGESTER);
                   dimensions.add(
                       TimeExtractionDimensionSpec.makeTimeFloor(funcGranularity,
-                          extractColumnName));
+                          extractColumnName, config.timeZone()));
                   finalGranularity = Granularity.ALL;
                   builder.add(extractColumnName);
                 } else {
@@ -632,7 +632,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
             String extractColumnName = SqlValidatorUtil.uniquify(EXTRACT_COLUMN_NAME_PREFIX,
                 usedFieldNames, SqlValidatorUtil.EXPR_SUGGESTER);
             timeExtractionDimensionSpec = TimeExtractionDimensionSpec.makeFullTimeExtract(
-                extractColumnName);
+                extractColumnName, config.timeZone());
             dimensions.add(timeExtractionDimensionSpec);
             builder.add(extractColumnName);
             assert timePositionIdx == -1;
@@ -1083,8 +1083,9 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     final List<String> metrics = new ArrayList<>();
     final DruidTable druidTable;
     final RelDataType rowType;
+    final String timeZone;
 
-    Translator(DruidTable druidTable, RelDataType rowType) {
+    Translator(DruidTable druidTable, RelDataType rowType, String timeZone) {
       this.druidTable = druidTable;
       this.rowType = rowType;
       for (RelDataTypeField f : rowType.getFieldList()) {
@@ -1096,6 +1097,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
           dimensions.add(fieldName);
         }
       }
+      this.timeZone = timeZone;
     }
 
     protected void clearFieldNameLists() {
@@ -1169,7 +1171,8 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
         // in case no extraction the field will be omitted from the serialization
         ExtractionFunction extractionFunction = null;
         if (granularity != null) {
-          extractionFunction = TimeExtractionFunction.createExtractFromGranularity(granularity);
+          extractionFunction =
+              TimeExtractionFunction.createExtractFromGranularity(granularity, timeZone);
         }
         String dimName = tr(e, posRef);
         if (dimName.equals(DruidConnectionImpl.DEFAULT_RESPONSE_TIMESTAMP_COLUMN)) {
@@ -1279,7 +1282,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
 
     private ColumnMetaData.Rep getPrimitive(RelDataTypeField field) {
       switch (field.getType().getSqlTypeName()) {
-      case TIMESTAMP:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         return ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP;
       case BIGINT:
         return ColumnMetaData.Rep.LONG;

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 343f03e..562e568 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -230,8 +230,8 @@ public class DruidRules {
       List<LocalInterval> intervals = null;
       if (!triple.getLeft().isEmpty()) {
         intervals = DruidDateTimeUtils.createInterval(
-            query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
-            RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false));
+            RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false),
+            cluster.getPlanner().getContext().unwrap(CalciteConnectionConfig.class).timeZone());
         if (intervals == null || intervals.isEmpty()) {
           // Case we have an filter with extract that can not be written as interval push down
           triple.getMiddle().addAll(triple.getLeft());
@@ -579,7 +579,7 @@ public class DruidRules {
         case MINUS:
         case DIVIDE:
         case TIMES:
-        case CAST:
+        //case CAST:
           return true;
         default:
           return false;

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
index d636ce8..d34e000 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
@@ -59,7 +59,7 @@ public class DruidTableFactory implements TableFactory {
     } else {
       timestampColumnName = DruidTable.DEFAULT_TIMESTAMP_COLUMN;
     }
-    fieldBuilder.put(timestampColumnName, SqlTypeName.TIMESTAMP);
+    fieldBuilder.put(timestampColumnName, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
     final Object dimensionsRaw = operand.get("dimensions");
     if (dimensionsRaw instanceof List) {
       // noinspection unchecked

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
index 656ee77..7ef19a6 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionDimensionSpec.java
@@ -34,9 +34,10 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
    *
    * @return the time extraction DimensionSpec instance
    */
-  public static TimeExtractionDimensionSpec makeFullTimeExtract(String outputName) {
+  public static TimeExtractionDimensionSpec makeFullTimeExtract(
+      String outputName, String timeZone) {
     return new TimeExtractionDimensionSpec(
-        TimeExtractionFunction.createDefault(), outputName);
+        TimeExtractionFunction.createDefault(timeZone), outputName);
   }
 
   /**
@@ -51,9 +52,9 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
    * is not supported
    */
   public static TimeExtractionDimensionSpec makeTimeExtract(
-      Granularity granularity, String outputName) {
+      Granularity granularity, String outputName, String timeZone) {
     return new TimeExtractionDimensionSpec(
-        TimeExtractionFunction.createExtractFromGranularity(granularity), outputName);
+        TimeExtractionFunction.createExtractFromGranularity(granularity, timeZone), outputName);
   }
 
   /**
@@ -64,8 +65,9 @@ public class TimeExtractionDimensionSpec extends ExtractionDimensionSpec {
    * @return floor time extraction DimensionSpec instance.
    */
   public static TimeExtractionDimensionSpec makeTimeFloor(Granularity granularity,
-      String outputName) {
-    ExtractionFunction fn = TimeExtractionFunction.createFloorFromGranularity(granularity);
+      String outputName, String timeZone) {
+    ExtractionFunction fn =
+        TimeExtractionFunction.createFloorFromGranularity(granularity, timeZone);
     return new TimeExtractionDimensionSpec(fn, outputName);
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
index 22733be..b1f8870 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/TimeExtractionFunction.java
@@ -75,8 +75,8 @@ public class TimeExtractionFunction implements ExtractionFunction {
    *
    * @return the time extraction function
    */
-  public static TimeExtractionFunction createDefault() {
-    return new TimeExtractionFunction(ISO_TIME_FORMAT, null, "UTC", null);
+  public static TimeExtractionFunction createDefault(String timeZone) {
+    return new TimeExtractionFunction(ISO_TIME_FORMAT, null, timeZone, null);
   }
 
   /**
@@ -87,16 +87,18 @@ public class TimeExtractionFunction implements ExtractionFunction {
    * @return the time extraction function corresponding to the granularity input unit
    * {@link TimeExtractionFunction#VALID_TIME_EXTRACT} for supported granularity
    */
-  public static TimeExtractionFunction createExtractFromGranularity(Granularity granularity) {
+  public static TimeExtractionFunction createExtractFromGranularity(
+      Granularity granularity, String timeZone) {
     switch (granularity) {
     case DAY:
-      return new TimeExtractionFunction("d", null, "UTC", Locale.getDefault().toLanguageTag());
+      return new TimeExtractionFunction("d", null, timeZone, Locale.getDefault().toLanguageTag());
     case MONTH:
-      return new TimeExtractionFunction("M", null, "UTC", Locale.getDefault().toLanguageTag());
+      return new TimeExtractionFunction("M", null, timeZone, Locale.getDefault().toLanguageTag());
     case YEAR:
-      return new TimeExtractionFunction("yyyy", null, "UTC", Locale.getDefault().toLanguageTag());
+      return new TimeExtractionFunction("yyyy", null, timeZone,
+          Locale.getDefault().toLanguageTag());
     case WEEK:
-      return new TimeExtractionFunction("w", null, "UTC", Locale.getDefault().toLanguageTag());
+      return new TimeExtractionFunction("w", null, timeZone, Locale.getDefault().toLanguageTag());
     default:
       throw new IllegalArgumentException("Granularity [" + granularity + "] is not supported");
     }
@@ -108,8 +110,9 @@ public class TimeExtractionFunction implements ExtractionFunction {
    * @param granularity granularity to apply to the column
    * @return the time extraction function or null if granularity is not supported
    */
-  public static TimeExtractionFunction createFloorFromGranularity(Granularity granularity) {
-    return new TimeExtractionFunction(ISO_TIME_FORMAT, granularity.value, "UTC", Locale
+  public static TimeExtractionFunction createFloorFromGranularity(
+      Granularity granularity, String timeZone) {
+    return new TimeExtractionFunction(ISO_TIME_FORMAT, granularity.value, timeZone, Locale
         .getDefault().toLanguageTag());
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
index b2e8635..e8e42be 100644
--- a/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
+++ b/druid/src/test/java/org/apache/calcite/adapter/druid/DruidQueryFilterTest.java
@@ -121,7 +121,7 @@ public class DruidQueryFilterTest {
         .add("dimensionName", varcharType)
         .build();
     final DruidQuery.Translator translatorStringKind =
-        new DruidQuery.Translator(druidTable, varcharRowType);
+        new DruidQuery.Translator(druidTable, varcharRowType, "UTC");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index e88aaed..1a0d3d3 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -235,12 +235,11 @@ public class DruidAdapterIT {
   @Test public void testSelectTimestampColumnNoTables2() {
     // Since columns are not explicitly declared, we use the default time
     // column in the query.
-    final String sql = "select \"__time\"\n"
+    final String sql = "select cast(\"__time\" as timestamp) as \"__time\"\n"
         + "from \"wikiticker\"\n"
         + "limit 1\n";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$0]], fetch=[1])\n";
+    final String explain =
+        "DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$0]], fetch=[1])\n";
     final String druidQuery = "{'queryType':'select',"
         + "'dataSource':'wikiticker','descending':false,"
         + "'intervals':['1900-01-01T00:00:00.000/3000-01-01T00:00:00.000'],"
@@ -255,12 +254,12 @@ public class DruidAdapterIT {
   @Test public void testSelectTimestampColumnNoTables3() {
     // Since columns are not explicitly declared, we use the default time
     // column in the query.
-    final String sql = "select floor(\"__time\" to DAY) as \"day\", sum(\"added\")\n"
+    final String sql =
+        "select cast(floor(\"__time\" to DAY) as timestamp) as \"day\", sum(\"added\")\n"
         + "from \"wikiticker\"\n"
         + "group by floor(\"__time\" to DAY)";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[FLOOR($0, FLAG(DAY)), $1]], groups=[{0}], aggs=[[SUM($1)]])\n";
+    final String explain =
+        "DruidQuery(table=[[wiki, wikiticker]], intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[FLOOR($0, FLAG(DAY)), $1]], groups=[{0}], aggs=[[SUM($1)]])\n";
     final String druidQuery = "{'queryType':'timeseries',"
         + "'dataSource':'wikiticker','descending':false,'granularity':'day',"
         + "'aggregations':[{'type':'longSum','name':'EXPR$1','fieldName':'added'}],"
@@ -276,12 +275,12 @@ public class DruidAdapterIT {
     // Since columns are not explicitly declared, we use the default time
     // column in the query.
     final String sql = "select sum(\"added\") as \"s\", \"page\", "
-        + "floor(\"__time\" to DAY) as \"day\"\n"
+        + "cast(floor(\"__time\" to DAY) as timestamp) as \"day\"\n"
         + "from \"wikiticker\"\n"
         + "group by \"page\", floor(\"__time\" to DAY)\n"
         + "order by \"s\" desc";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableProject(s=[$2], page=[$0], day=[$1])\n"
+        + "  BindableProject(s=[$2], page=[$0], day=[CAST($1):TIMESTAMP(0)])\n"
         + "    DruidQuery(table=[[wiki, wikiticker]], "
         + "intervals=[[1900-01-01T00:00:00.000/3000-01-01T00:00:00.000]], projects=[[$17, FLOOR"
         + "($0, FLAG(DAY)), $1]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC])";
@@ -296,7 +295,8 @@ public class DruidAdapterIT {
   }
 
   @Test public void testSkipEmptyBuckets() {
-    final String sql = "select floor(\"__time\" to SECOND) as \"second\", sum(\"added\")\n"
+    final String sql =
+        "select cast(floor(\"__time\" to SECOND) as timestamp) as \"second\", sum(\"added\")\n"
         + "from \"wikiticker\"\n"
         + "where \"page\" = 'Jeremy Corbyn'\n"
         + "group by floor(\"__time\" to SECOND)";
@@ -334,12 +334,10 @@ public class DruidAdapterIT {
    * Druid adapter: Send timestamp literals to Druid as local time, not
    * UTC</a>. */
   @Test public void testFilterTime() {
-    final String sql = "select \"__time\"\n"
+    final String sql = "select cast(\"__time\" as timestamp) as \"__time\"\n"
         + "from \"wikiticker\"\n"
-        + "where \"__time\" < '2015-10-12 00:00:00'";
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[wiki, wikiticker]], "
+        + "where \"__time\" < '2015-10-12 00:00:00 UTC'";
+    final String explain = "\n    DruidQuery(table=[[wiki, wikiticker]], "
         + "intervals=[[1900-01-01T00:00:00.000/2015-10-12T00:00:00.000]], "
         + "projects=[[$0]])\n";
     final String druidQuery = "{'queryType':'select',"
@@ -357,12 +355,14 @@ public class DruidAdapterIT {
   }
 
   @Test public void testFilterTimeDistinct() {
-    final String sql = "select distinct \"__time\"\n"
+    final String sql = "select CAST(\"c1\" AS timestamp) as \"__time\" from\n"
+        + "(select distinct \"__time\" as \"c1\"\n"
         + "from \"wikiticker\"\n"
-        + "where \"__time\" < '2015-10-12 00:00:00'";
+        + "where \"__time\" < '2015-10-12 00:00:00 UTC')";
     final String explain = "PLAN="
         + "EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[wiki, wikiticker]], "
+        + "  BindableProject(__time=[CAST($0):TIMESTAMP(0)])\n"
+        + "    DruidQuery(table=[[wiki, wikiticker]], "
         + "intervals=[[1900-01-01T00:00:00.000/2015-10-12T00:00:00.000]], "
         + "groups=[{0}], aggs=[[]])\n";
     final String subDruidQuery = "{'queryType':'groupBy','dataSource':'wikiticker',"
@@ -371,10 +371,10 @@ public class DruidAdapterIT {
         + "'extractionFn':{'type':'timeFormat'";
     sql(sql, WIKI_AUTO2)
         .limit(2)
-        .returnsUnordered("__time=2015-09-12 00:46:58",
-            "__time=2015-09-12 00:47:00")
         .explainContains(explain)
-        .queryContains(druidChecker(subDruidQuery));
+        .queryContains(druidChecker(subDruidQuery))
+        .returnsUnordered("__time=2015-09-12 00:46:58",
+            "__time=2015-09-12 00:47:00");
   }
 
   @Test public void testMetadataColumns() throws Exception {
@@ -390,10 +390,11 @@ public class DruidAdapterIT {
                   while (r.next()) {
                     map.put(r.getString("TYPE_NAME"), true);
                   }
+                  System.out.println(map);
                   // 1 timestamp, 2 float measure, 1 int measure, 88 dimensions
                   assertThat(map.keySet().size(), is(4));
                   assertThat(map.values().size(), is(92));
-                  assertThat(map.get("TIMESTAMP(0)").size(), is(1));
+                  assertThat(map.get("TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)").size(), is(1));
                   assertThat(map.get("DOUBLE").size(), is(2));
                   assertThat(map.get("BIGINT").size(), is(1));
                   assertThat(map.get(VARCHAR_TYPE).size(), is(88));
@@ -689,18 +690,14 @@ public class DruidAdapterIT {
    * <p>Before CALCITE-1578 was fixed, this would use a "topN" query but return
    * the wrong results. */
   @Test public void testGroupByDaySortDescLimit() {
-    final String sql = "select \"brand_name\", floor(\"timestamp\" to DAY) as d,"
+    final String sql = "select \"brand_name\","
+        + " cast(floor(\"timestamp\" to DAY) as timestamp) as d,"
         + " sum(\"unit_sales\") as s\n"
         + "from \"foodmart\"\n"
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 30";
-    final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
-        + "'granularity':'day','dimensions':[{'type':'default','dimension':'brand_name'}],"
-        + "'limitSpec':{'type':'default'},"
-        + "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'}],"
-        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain =
+        "    DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
         + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], "
         + "fetch=[30])";
@@ -725,7 +722,8 @@ public class DruidAdapterIT {
    * wrongly try to use a {@code limitSpec} to sort and filter. (A "topN" query
    * was not possible because the sort was {@code ASC}.) */
   @Test public void testGroupByDaySortLimit() {
-    final String sql = "select \"brand_name\", floor(\"timestamp\" to DAY) as d,"
+    final String sql = "select \"brand_name\","
+        + " cast(floor(\"timestamp\" to DAY) as timestamp) as d,"
         + " sum(\"unit_sales\") as s\n"
         + "from \"foodmart\"\n"
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
@@ -739,8 +737,7 @@ public class DruidAdapterIT {
         + "'dimensionOrder':'numeric'}]},'aggregations':[{'type':'longSum',"
         + "'name':'S','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain = "DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
         + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[2], dir0=[DESC], "
         + "fetch=[30])";
@@ -757,7 +754,8 @@ public class DruidAdapterIT {
    * <a href="https://issues.apache.org/jira/browse/CALCITE-1580">[CALCITE-1580]
    * Druid adapter: Wrong semantics for ordering within groupBy queries</a>. */
   @Test public void testGroupByDaySortDimension() {
-    final String sql = "select \"brand_name\", floor(\"timestamp\" to DAY) as d,"
+    final String sql =
+        "select \"brand_name\", cast(floor(\"timestamp\" to DAY) as timestamp) as d,"
         + " sum(\"unit_sales\") as s\n"
         + "from \"foodmart\"\n"
         + "group by \"brand_name\", floor(\"timestamp\" to DAY)\n"
@@ -766,8 +764,7 @@ public class DruidAdapterIT {
         + "'granularity':'all','dimensions':[{'type':'default',"
         + "'dimension':'brand_name'},{'type':'extraction','dimension':'__time',"
         + "'outputName':'floor_day','extractionFn':{'type':'timeFormat'";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain = "    DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, FLOOR"
         + "($0, FLAG(DAY)), $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[0], dir0=[ASC])";
     sql(sql)
@@ -790,8 +787,7 @@ public class DruidAdapterIT {
         + "'filter':{'type':'and','fields':["
         + "{'type':'bound','dimension':'product_id','lower':'1500','lowerStrict':false,'ordering':'lexicographic'},"
         + "{'type':'bound','dimension':'product_id','upper':'1502','upperStrict':false,'ordering':'lexicographic'}]},"
-        + "'dimensions':['product_name','state_province','product_id'],"
-        + "'metrics':[],'granularity':'all',"
+        + "'dimensions':['product_name','state_province','product_id'],'metrics':[],'granularity':'all',"
         + "'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
@@ -819,13 +815,13 @@ public class DruidAdapterIT {
     final String sql = "select \"product_name\" from \"foodmart\"\n"
         + "where \"product_id\" BETWEEN 1500 AND 1502\n"
         + "order by \"state_province\" desc, \"product_id\"";
-    final String druidQuery = "{'queryType':'select','dataSource':'foodmart','descending':false,"
-            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],'filter':{'type':"
-            + "'and','fields':[{'type':'bound','dimension':'product_id','lower':'1500',"
-            + "'lowerStrict':false,'ordering':'numeric'},{'type':'bound','dimension':'product_id',"
-            + "'upper':'1502','upperStrict':false,'ordering':'numeric'}]},'dimensions':"
-            + "['product_name','state_province','product_id'],'metrics':[],'granularity':'all','pagingSpec':"
-            + "{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
+    final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
+        + "'descending':false,'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+        + "'filter':{'type':'and','fields':["
+        + "{'type':'bound','dimension':'product_id','lower':'1500','lowerStrict':false,'ordering':'numeric'},"
+        + "{'type':'bound','dimension':'product_id','upper':'1502','upperStrict':false,'ordering':'numeric'}]},"
+        + "'dimensions':['product_name','state_province','product_id'],'metrics':[],'granularity':'all',"
+        + "'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
         .returns(
@@ -854,8 +850,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
         + "'descending':false,'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
         + "'filter':{'type':'selector','dimension':'product_id','value':'-1'},"
-        + "'dimensions':['product_name'],"
-        + "'metrics':[],'granularity':'all',"
+        + "'dimensions':['product_name'],'metrics':[],'granularity':'all',"
         + "'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
@@ -871,8 +866,7 @@ public class DruidAdapterIT {
         + "order by \"state_province\" desc, \"product_id\"";
     final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
         + "'descending':false,'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
-        + "'dimensions':['product_id','product_name','state_province'],"
-        + "'metrics':[],'granularity':'all',"
+        + "'dimensions':['product_id','product_name','state_province'],'metrics':[],'granularity':'all',"
         + "'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
@@ -959,7 +953,8 @@ public class DruidAdapterIT {
    * "topN" because we have a global limit, and that requires
    * {@code granularity: all}. */
   @Test public void testGroupByTimeAndOneColumnNotProjectedWithLimit() {
-    final String sql = "select count(*) as \"c\", floor(\"timestamp\" to MONTH) as \"month\"\n"
+    final String sql = "select count(*) as \"c\","
+        + " cast(floor(\"timestamp\" to MONTH) as timestamp) as \"month\"\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH), \"state_province\"\n"
         + "order by \"c\" desc limit 3";
@@ -972,7 +967,7 @@ public class DruidAdapterIT {
 
   @Test public void testGroupByTimeAndOneMetricNotProjected() {
     final String sql =
-            "select count(*) as \"c\", floor(\"timestamp\" to MONTH) as \"month\", floor"
+            "select count(*) as \"c\", cast(floor(\"timestamp\" to MONTH) as timestamp) as \"month\", floor"
                     + "(\"store_sales\") as sales\n"
                     + "from \"foodmart\"\n"
                     + "group by floor(\"timestamp\" to MONTH), \"state_province\", floor"
@@ -986,7 +981,7 @@ public class DruidAdapterIT {
 
   @Test public void testGroupByTimeAndOneColumnNotProjected() {
     final String sql = "select count(*) as \"c\",\n"
-        + "  floor(\"timestamp\" to MONTH) as \"month\"\n"
+        + "  cast(floor(\"timestamp\" to MONTH) as timestamp) as \"month\"\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH), \"state_province\"\n"
         + "having count(*) > 3500";
@@ -1066,46 +1061,45 @@ public class DruidAdapterIT {
    * <a href="https://issues.apache.org/jira/browse/CALCITE-1577">[CALCITE-1577]
    * Druid adapter: Incorrect result - limit on timestamp disappears</a>. */
   @Test public void testGroupByMonthGranularitySort() {
-    final String sql = "select floor(\"timestamp\" to MONTH) as m,\n"
-        + " sum(\"unit_sales\") as s,\n"
+    final String sql = "select sum(\"unit_sales\") as s,\n"
         + " count(\"store_sqft\") as c\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) ASC";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
-        + "    BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
-        + "      BindableProject(M=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n"
-        + "        DruidQuery(table=[[foodmart, foodmart]], "
+        + "  BindableSort(sort0=[$2], dir0=[ASC])\n"
+        + "    BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n"
+        + "      BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
+        + "        BindableProject($f0=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n"
+        + "          DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$0, $71, $89]])";
     sql(sql)
-        .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=5957",
-            "M=1997-02-01 00:00:00; S=20957; C=5842",
-            "M=1997-03-01 00:00:00; S=23706; C=6528",
-            "M=1997-04-01 00:00:00; S=20179; C=5523",
-            "M=1997-05-01 00:00:00; S=21081; C=5793",
-            "M=1997-06-01 00:00:00; S=21350; C=5863",
-            "M=1997-07-01 00:00:00; S=23763; C=6762",
-            "M=1997-08-01 00:00:00; S=21697; C=5915",
-            "M=1997-09-01 00:00:00; S=20388; C=5591",
-            "M=1997-10-01 00:00:00; S=19958; C=5606",
-            "M=1997-11-01 00:00:00; S=25270; C=7026",
-            "M=1997-12-01 00:00:00; S=26796; C=7338")
-        .explainContains(explain);
+        .explainContains(explain)
+        .returnsOrdered("S=21628; C=5957",
+                "S=20957; C=5842",
+                "S=23706; C=6528",
+                "S=20179; C=5523",
+                "S=21081; C=5793",
+                "S=21350; C=5863",
+                "S=23763; C=6762",
+                "S=21697; C=5915",
+                "S=20388; C=5591",
+                "S=19958; C=5606",
+                "S=25270; C=7026",
+                "S=26796; C=7338");
   }
 
   @Test public void testGroupByMonthGranularitySortLimit() {
-    final String sql = "select floor(\"timestamp\" to MONTH) as m,\n"
+    final String sql = "select cast(floor(\"timestamp\" to MONTH) as timestamp) as m,\n"
         + " sum(\"unit_sales\") as s,\n"
         + " count(\"store_sqft\") as c\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) limit 3";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n"
-        + "    BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
-        + "      BindableProject(M=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n"
-        + "        DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain = "BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n"
+        + "      BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
+        + "        BindableProject($f0=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n"
+        + "          DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$0, $71, $89]])";
     sql(sql)
         .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=5957",
@@ -1122,16 +1116,16 @@ public class DruidAdapterIT {
     String druidQuery = "{'queryType':'select','dataSource':'foodmart'";
     sql(sql)
         .limit(3)
-        .returnsUnordered("S=1244; C=391", "S=550; C=112", "S=580; C=171")
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker(druidQuery))
+        .returnsUnordered("S=1244; C=391", "S=550; C=112", "S=580; C=171");
   }
 
   @Test public void testGroupByMonthGranularityFiltered() {
     final String sql = "select sum(\"unit_sales\") as s,\n"
         + " count(\"store_sqft\") as c\n"
         + "from \"foodmart\"\n"
-        + "where \"timestamp\" >= '1996-01-01 00:00:00' and "
-        + " \"timestamp\" < '1998-01-01 00:00:00'\n"
+        + "where \"timestamp\" >= '1996-01-01 00:00:00 UTC' and "
+        + " \"timestamp\" < '1998-01-01 00:00:00 UTC'\n"
         + "group by floor(\"timestamp\" to MONTH)";
     String druidQuery = "{'queryType':'select','dataSource':'foodmart'";
     sql(sql)
@@ -1179,8 +1173,8 @@ public class DruidAdapterIT {
         + "max(\"unit_sales\") as m,\n"
         + "\"state_province\" as p\n"
         + "from \"foodmart\"\n"
-        + "where \"timestamp\" >= '1997-01-01 00:00:00' and "
-        + " \"timestamp\" < '1997-09-01 00:00:00'\n"
+        + "where \"timestamp\" >= '1997-01-01 00:00:00 UTC' and "
+        + " \"timestamp\" < '1997-09-01 00:00:00 UTC'\n"
         + "group by \"state_province\", floor(\"timestamp\" to DAY)\n"
         + "order by s desc limit 6";
     final String explain = "PLAN=EnumerableInterpreter\n"
@@ -1383,12 +1377,11 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "where extract(year from \"timestamp\") = 1997\n"
         + "and extract(month from \"timestamp\") in (4, 6)\n";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain = "DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[AND(="
-        + "(EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), 86400000)), 1997), OR(=(EXTRACT_DATE"
-        + "(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), 4), =(EXTRACT_DATE(FLAG(MONTH), /INT"
-        + "(Reinterpret($0), 86400000)), 6)))], groups=[{}], aggs=[[COUNT()]])";
+        + "(EXTRACT_DATE(FLAG(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), 1997), OR(=(EXTRACT_DATE"
+        + "(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), 4), =(EXTRACT_DATE(FLAG(MONTH), "
+        + "/INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), 6)))], groups=[{}], aggs=[[COUNT()]])";
     sql(sql)
         .explainContains(explain)
         .returnsUnordered("C=13500");
@@ -1430,17 +1423,17 @@ public class DruidAdapterIT {
   @Test public void testFieldBasedCostColumnPruning() {
     // A query where filter cannot be pushed to Druid but
     // the project can still be pushed in order to prune extra columns.
-    String sql = "select \"countryName\", floor(\"time\" to DAY),\n"
+    String sql = "select \"countryName\", floor(CAST(\"time\" AS TIMESTAMP) to DAY),\n"
         + "  cast(count(*) as integer) as c\n"
         + "from \"wiki\"\n"
-        + "where floor(\"time\" to DAY) >= '1997-01-01 00:00:00'\n"
-        + "and floor(\"time\" to DAY) < '1997-09-01 00:00:00'\n"
-        + "group by \"countryName\", floor(\"time\" TO DAY)\n"
+        + "where floor(\"time\" to DAY) >= '1997-01-01 00:00:00 UTC'\n"
+        + "and floor(\"time\" to DAY) < '1997-09-01 00:00:00 UTC'\n"
+        + "group by \"countryName\", floor(CAST(\"time\" AS TIMESTAMP) TO DAY)\n"
         + "order by c limit 5";
     String plan = "BindableProject(countryName=[$0], EXPR$1=[$1], C=[CAST($2):INTEGER NOT NULL])\n"
         + "    BindableSort(sort0=[$2], dir0=[ASC], fetch=[5])\n"
         + "      BindableAggregate(group=[{0, 1}], agg#0=[COUNT()])\n"
-        + "        BindableProject(countryName=[$1], EXPR$1=[FLOOR($0, FLAG(DAY))])\n"
+        + "        BindableProject(countryName=[$1], EXPR$1=[FLOOR(CAST($0):TIMESTAMP(0), FLAG(DAY))])\n"
         + "          BindableFilter(condition=[AND(>=(FLOOR($0, FLAG(DAY)), 1997-01-01 00:00:00), <(FLOOR($0, FLAG(DAY)), 1997-09-01 00:00:00))])\n"
         + "            DruidQuery(table=[[wiki, wiki]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$0, $5]])";
     // NOTE: Druid query only has countryName as the dimension
@@ -1459,10 +1452,11 @@ public class DruidAdapterIT {
   }
 
   @Test public void testGroupByMetricAndExtractTime() {
-    final String sql = "SELECT count(*), floor(\"timestamp\" to DAY), \"store_sales\" "
-            + "FROM \"foodmart\"\n"
-            + "GROUP BY \"store_sales\", floor(\"timestamp\" to DAY)\n ORDER BY \"store_sales\" DESC\n"
-            + "LIMIT 10\n";
+    final String sql =
+        "SELECT count(*), cast(floor(\"timestamp\" to DAY) as timestamp), \"store_sales\" "
+        + "FROM \"foodmart\"\n"
+        + "GROUP BY \"store_sales\", floor(\"timestamp\" to DAY)\n ORDER BY \"store_sales\" DESC\n"
+        + "LIMIT 10\n";
     sql(sql).queryContains(druidChecker("{\"queryType\":\"select\""));
   }
 
@@ -1475,10 +1469,11 @@ public class DruidAdapterIT {
   }
 
   @Test public void testPushAggregateOnTime() {
-    String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" "
+    String sql = "select \"product_id\", cast(\"timestamp\" as timestamp) as \"time\" "
+        + "from \"foodmart\" "
         + "where \"product_id\" = 1016 "
-        + "and \"timestamp\" < cast('1997-01-03' as timestamp) "
-        + "and \"timestamp\" > cast('1990-01-01' as timestamp) "
+        + "and \"timestamp\" < '1997-01-03 00:00:00 UTC' "
+        + "and \"timestamp\" > '1990-01-01 00:00:00 UTC' "
         + "group by \"timestamp\", \"product_id\" ";
     String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
         + "'granularity':'all','dimensions':[{'type':'extraction',"
@@ -1592,9 +1587,9 @@ public class DruidAdapterIT {
         .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
-            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
-            + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
-            + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), "
+            + "EXTRACT_DATE(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), EXTRACT_DATE(FLAG"
+            + "(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
         .returnsUnordered("day=2; month=1; year=1997; product_id=1016",
             "day=10; month=1; year=1997; product_id=1016",
             "day=13; month=1; year=1997; product_id=1016",
@@ -1626,9 +1621,9 @@ public class DruidAdapterIT {
         .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)"
-            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), "
-            + "EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), EXTRACT_DATE(FLAG"
-            + "(YEAR), /INT(Reinterpret($0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
+            + "], projects=[[EXTRACT_DATE(FLAG(DAY), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), "
+            + "EXTRACT_DATE(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), EXTRACT_DATE(FLAG"
+            + "(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), $1]], groups=[{0, 1, 2, 3}], aggs=[[]])\n")
         .returnsUnordered("EXPR$0=2; EXPR$1=1; EXPR$2=1997; product_id=1016",
             "EXPR$0=10; EXPR$1=1; EXPR$2=1997; product_id=1016",
             "EXPR$0=13; EXPR$1=1; EXPR$2=1997; product_id=1016",
@@ -1653,7 +1648,7 @@ public class DruidAdapterIT {
         .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1997-01-01T00:00:00.001/1997-01-20T00:00:00.000]], filter=[=($1, 1016)], "
-            + "projects=[[EXTRACT_DATE(FLAG(DAY), /INT(Reinterpret($0), 86400000)), $1]], "
+            + "projects=[[EXTRACT_DATE(FLAG(DAY), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), $1]], "
             + "groups=[{0, 1}], aggs=[[]])\n")
         .returnsUnordered("EXPR$0=2; dayOfMonth=1016", "EXPR$0=10; dayOfMonth=1016",
             "EXPR$0=13; dayOfMonth=1016", "EXPR$0=16; dayOfMonth=1016");
@@ -1682,7 +1677,7 @@ public class DruidAdapterIT {
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[AND(>=(CAST"
             + "($11):BIGINT, 8), <=(CAST($11):BIGINT, 10), <(CAST($10):BIGINT, 15), =(EXTRACT_DATE"
-            + "(FLAG(YEAR), /INT(Reinterpret($0), 86400000)), 1997))], groups=[{}], "
+            + "(FLAG(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), 1997))], groups=[{}], "
             + "aggs=[[SUM($90)]])")
         .queryContains(druidChecker(druidQuery))
         .returnsUnordered("EXPR$0=75364.09998679161");
@@ -1830,33 +1825,33 @@ public class DruidAdapterIT {
         .explainContains("PLAN=EnumerableInterpreter\n"
             + "  DruidQuery(table=[[foodmart, foodmart]], "
             + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
-            + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), "
+            + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), "
             + "86400000)), $1, $89]], groups=[{0, 1}], aggs=[[SUM($2)]], sort0=[0], sort1=[2], "
             + "sort2=[1], dir0=[ASC], dir1=[ASC], dir2=[ASC])");
   }
 
 
   @Test public void testGroupByFloorTimeWithoutLimit() {
-    final String sql = "select  floor(\"timestamp\" to MONTH) as \"month\"\n"
+    final String sql = "select cast(floor(\"timestamp\" to MONTH) as timestamp) as \"month\"\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by \"month\" DESC";
     sql(sql)
-        .explainContains("PLAN=EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        .explainContains("DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
         + "FLAG(MONTH))]], groups=[{0}], aggs=[[]], sort0=[0], dir0=[DESC])")
         .queryContains(druidChecker("'queryType':'timeseries'", "'descending':true"));
   }
 
   @Test public void testGroupByFloorTimeWithLimit() {
-    final String sql = "select  floor(\"timestamp\" to MONTH) as \"floor_month\"\n"
+    final String sql =
+        "select cast(floor(\"timestamp\" to MONTH) as timestamp) as \"floor_month\"\n"
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by \"floor_month\" DESC LIMIT 3";
-    final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$0], dir0=[DESC], fetch=[3])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], "
+    final String explain =
+        "    BindableSort(sort0=[$0], dir0=[DESC], fetch=[3])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
         + "projects=[[FLOOR($0, FLAG(MONTH))]], groups=[{0}], aggs=[[]], "
         + "sort0=[0], dir0=[DESC])";
@@ -1876,8 +1871,8 @@ public class DruidAdapterIT {
     final String expectedPlan = "PLAN=EnumerableInterpreter\n"
         + "  DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
-        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), 86400000)),"
-        + " EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), $1, $89]], groups=[{0, 1,"
+        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)),"
+        + " EXTRACT_DATE(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), $1, $89]], groups=[{0, 1,"
         + " 2}], aggs=[[SUM($3)]], sort0=[0], sort1=[1], sort2=[3], sort3=[2], dir0=[DESC], "
         + "dir1=[ASC], dir2=[DESC], dir3=[ASC], fetch=[3])";
     final String expectedDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
@@ -1912,8 +1907,8 @@ public class DruidAdapterIT {
     final String expectedPlan = "PLAN=EnumerableInterpreter\n"
         + "  DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[>=(CAST($1)"
-        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), 86400000)),"
-        + " EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret($0), 86400000)), $1, $89]], groups=[{0, 1,"
+        + ":BIGINT, 1558)], projects=[[EXTRACT_DATE(FLAG(YEAR), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)),"
+        + " EXTRACT_DATE(FLAG(MONTH), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000)), $1, $89]], groups=[{0, 1,"
         + " 2}], aggs=[[SUM($3)]], sort0=[3], sort1=[1], sort2=[2], dir0=[DESC], dir1=[DESC], "
         + "dir2=[ASC], fetch=[3])";
     final String expectedDruidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
@@ -1939,12 +1934,13 @@ public class DruidAdapterIT {
   }
 
   @Test public void testGroupByTimeSortOverMetrics() {
-    final String sqlQuery = "SELECT count(*) as c , SUM(\"unit_sales\") as s, floor(\"timestamp\""
-        + " to month) FROM \"foodmart\" group by floor(\"timestamp\" to month) order by s DESC";
+    final String sqlQuery = "SELECT count(*) as c , SUM(\"unit_sales\") as s,"
+        + " cast(floor(\"timestamp\" to month) as timestamp)"
+        + " FROM \"foodmart\" group by floor(\"timestamp\" to month) order by s DESC";
     sql(sqlQuery)
         .explainContains("PLAN=EnumerableInterpreter\n"
         + "  BindableSort(sort0=[$1], dir0=[DESC])\n"
-        + "    BindableProject(C=[$1], S=[$2], EXPR$2=[$0])\n"
+        + "    BindableProject(C=[$1], S=[$2], EXPR$2=[CAST($0):TIMESTAMP(0)])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[FLOOR($0, "
         + "FLAG(MONTH)), $89]], groups=[{0}], aggs=[[COUNT(), SUM($1)]])")
@@ -1964,8 +1960,8 @@ public class DruidAdapterIT {
   }
 
   @Test public void testNumericOrderingOfOrderByOperatorFullTime() {
-    final String sqlQuery = "SELECT \"timestamp\", count(*) as c, SUM(\"unit_sales\")  "
-        + "as s FROM "
+    final String sqlQuery = "SELECT cast(\"timestamp\" as timestamp) as \"timestamp\","
+        + " count(*) as c, SUM(\"unit_sales\") as s FROM "
         + "\"foodmart\" group by \"timestamp\" order by \"timestamp\" DESC, c DESC, s LIMIT 5";
     final String druidSubQuery = "'limitSpec':{'type':'default','limit':5,"
         + "'columns':[{'dimension':'extract','direction':'descending',"
@@ -2044,7 +2040,7 @@ public class DruidAdapterIT {
         + "\"product_id\" = 1558 group by extract(CENTURY from \"timestamp\")";
     final String plan = "PLAN=EnumerableInterpreter\n"
         + "  BindableAggregate(group=[{0}])\n"
-        + "    BindableProject(EXPR$0=[EXTRACT_DATE(FLAG(CENTURY), /INT(Reinterpret($0), 86400000))])\n"
+        + "    BindableProject(EXPR$0=[EXTRACT_DATE(FLAG(CENTURY), /INT(CAST(Reinterpret($0)):TIMESTAMP(0), 86400000))])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[=($1, 1558)], "
         + "projects=[[$0]])";
@@ -2491,7 +2487,7 @@ public class DruidAdapterIT {
   @Test public void testOrderByOnMetricsInSelectDruidQuery() {
     final String sqlQuery = "select \"store_sales\" as a, \"store_cost\" as b, \"store_sales\" - "
             + "\"store_cost\" as c from \"foodmart\" where \"timestamp\" "
-            + ">= '1997-01-01 00:00:00' and \"timestamp\" < '1997-09-01 00:00:00' order by c "
+            + ">= '1997-01-01 00:00:00 UTC' and \"timestamp\" < '1997-09-01 00:00:00 UTC' order by c "
             + "limit 5";
     String postAggString = "'queryType':'select'";
     final String plan = "PLAN=EnumerableInterpreter\n"

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java b/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
index 74ce10c..0e1948d 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidDateRangeRulesTest.java
@@ -158,7 +158,7 @@ public class DruidDateRangeRulesTest {
               operandRanges));
     }
     final List<LocalInterval> intervals =
-        DruidDateTimeUtils.createInterval(f.timeStampDataType, e);
+        DruidDateTimeUtils.createInterval(e, "UTC");
     assertThat(intervals, notNullValue());
     assertThat(intervals.toString(), intervalMatcher);
   }
@@ -178,7 +178,7 @@ public class DruidDateRangeRulesTest {
     }
     final RexNode e2 = f.simplify.simplify(e);
     List<LocalInterval> intervals =
-        DruidDateTimeUtils.createInterval(f.timeStampDataType, e2);
+        DruidDateTimeUtils.createInterval(e2, "UTC");
     if (intervals == null) {
       throw new AssertionError("null interval");
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/939c9a62/site/_docs/reference.md
----------------------------------------------------------------------
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index 209d39a..2bc70fd 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -978,6 +978,7 @@ name will have been converted to upper case also.
 | DATE        | Date                      | Example: DATE '1969-07-20'
 | TIME        | Time of day               | Example: TIME '20:17:40'
 | TIMESTAMP [ WITHOUT TIME ZONE ] | Date and time | Example: TIMESTAMP '1969-07-20 20:17:40'
+| TIMESTAMP WITH LOCAL TIME ZONE | Date and time with local time zone | Example: TIMESTAMP '1969-07-20 20:17:40 America/Los Angeles'
 | TIMESTAMP WITH TIME ZONE | Date and time with time zone | Example: TIMESTAMP '1969-07-20 20:17:40 America/Los Angeles'
 | INTERVAL timeUnit [ TO timeUnit ] | Date time interval | Examples: INTERVAL '1-5' YEAR TO MONTH, INTERVAL '45' DAY, INTERVAL '1 2:34:56.789' DAY TO SECOND
 | GEOMETRY | Geometry | Examples: ST_GeomFromText('POINT (30 10)')
@@ -991,9 +992,11 @@ timeUnit:
 
 Note:
 
-* DATE, TIME and TIMESTAMP have no time zone. There is not even an implicit
-  time zone, such as UTC (as in Java) or the local time zone. It is left to
-  the user or application to supply a time zone.
+* DATE, TIME and TIMESTAMP have no time zone. For those types, there is not
+  even an implicit time zone, such as UTC (as in Java) or the local time zone.
+  It is left to the user or application to supply a time zone. In turn,
+  TIMESTAMP WITH LOCAL TIME ZONE does not store the time zone internally, but
+  it will rely on the supplied time zone to provide correct semantics.
 * GEOMETRY is allowed only in certain
   [conformance levels]({{ site.apiRoot }}/org/apache/calcite/sql/validate/SqlConformance.html#allowGeometry--).