You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2018/02/17 03:28:35 UTC

[4/4] calcite git commit: [CALCITE-2170] Use Druid Expressions capabilities to improve the amount of work that can be pushed to Druid

[CALCITE-2170] Use Druid Expressions capabilities to improve the amount of work that can be pushed to Druid

Close apache/calcite#624


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/98f3704e
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/98f3704e
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/98f3704e

Branch: refs/heads/master
Commit: 98f3704ea4536d6ead6465376bd02139b889f6e9
Parents: 707f4de
Author: Christian Tzolov <ch...@gmail.com>
Authored: Thu Nov 3 06:30:04 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Fri Feb 16 19:16:58 2018 -0800

----------------------------------------------------------------------
 .../adapter/druid/BinaryOperatorConversion.java |   64 +
 .../adapter/druid/CeilOperatorConversion.java   |   77 +
 .../adapter/druid/DefaultDimensionSpec.java     |   29 +-
 .../calcite/adapter/druid/DimensionSpec.java    |    8 +-
 .../adapter/druid/DirectOperatorConversion.java |   55 +
 .../adapter/druid/DruidConnectionImpl.java      |   49 +-
 .../adapter/druid/DruidDateTimeUtils.java       |   68 +
 .../calcite/adapter/druid/DruidExpressions.java |  283 +++
 .../apache/calcite/adapter/druid/DruidJson.java |   29 +
 .../calcite/adapter/druid/DruidJsonFilter.java  |  642 +++++
 .../calcite/adapter/druid/DruidQuery.java       | 2114 ++++++++--------
 .../adapter/druid/DruidResultEnumerator.java    |   25 -
 .../calcite/adapter/druid/DruidRules.java       |  626 +----
 .../adapter/druid/DruidSqlCastConverter.java    |  152 ++
 .../druid/DruidSqlOperatorConverter.java        |   49 +
 .../apache/calcite/adapter/druid/DruidType.java |   16 +-
 .../druid/ExtractOperatorConversion.java        |   80 +
 .../adapter/druid/ExtractionDimensionSpec.java  |   50 +-
 .../adapter/druid/ExtractionFunction.java       |    2 +-
 .../adapter/druid/FloorOperatorConversion.java  |   74 +
 .../calcite/adapter/druid/Granularities.java    |    4 +-
 .../calcite/adapter/druid/Granularity.java      |    2 +-
 .../adapter/druid/NaryOperatorConverter.java    |   60 +
 .../druid/SubstringOperatorConversion.java      |   63 +
 .../druid/TimeExtractionDimensionSpec.java      |   75 -
 .../adapter/druid/TimeExtractionFunction.java   |   63 +-
 .../druid/UnaryPrefixOperatorConversion.java    |   63 +
 .../druid/UnarySuffixOperatorConversion.java    |   62 +
 .../calcite/adapter/druid/VirtualColumn.java    |  100 +
 .../adapter/druid/DruidQueryFilterTest.java     |   47 +-
 .../org/apache/calcite/test/DruidAdapterIT.java | 2367 ++++++++++++------
 31 files changed, 4880 insertions(+), 2518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
new file mode 100644
index 0000000..d10c147
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import java.util.List;
+
+/**
+ * Binary operator conversion utility class used to convert expression like exp1 Operator exp2
+ */
+public class BinaryOperatorConversion implements DruidSqlOperatorConverter {
+  private final SqlOperator operator;
+  private final String druidOperator;
+
+  public BinaryOperatorConversion(final SqlOperator operator, final String druidOperator) {
+    this.operator = operator;
+    this.druidOperator = druidOperator;
+  }
+
+  @Override public SqlOperator calciteOperator() {
+    return operator;
+  }
+
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+
+    final RexCall call = (RexCall) rexNode;
+
+    final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+        druidQuery, rowType,
+        call.getOperands());
+    if (druidExpressions == null) {
+      return null;
+    }
+    if (druidExpressions.size() != 2) {
+      throw new IllegalStateException(
+          DruidQuery.format("Got binary operator[%s] with %s args?", operator.getName(),
+              druidExpressions.size()));
+    }
+
+    return DruidQuery
+        .format("(%s %s %s)", druidExpressions.get(0), druidOperator, druidExpressions.get(1));
+  }
+}
+
+// End BinaryOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
new file mode 100644
index 0000000..7f15307
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+/**
+ * DruidSqlOperatorConverter implementation that handles Ceil operations conversions
+ */
+public class CeilOperatorConversion implements DruidSqlOperatorConverter {
+  @Override public SqlOperator calciteOperator() {
+    return SqlStdOperatorTable.CEIL;
+  }
+
+  @Nullable
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+      DruidQuery query) {
+    final RexCall call = (RexCall) rexNode;
+    final RexNode arg = call.getOperands().get(0);
+    final String druidExpression = DruidExpressions.toDruidExpression(
+        arg,
+        rowType,
+        query);
+    if (druidExpression == null) {
+      return null;
+    } else if (call.getOperands().size() == 1) {
+      // case CEIL(expr)
+      return  DruidQuery.format("ceil(%s)", druidExpression);
+    } else if (call.getOperands().size() == 2) {
+      // CEIL(expr TO timeUnit)
+      final RexLiteral flag = (RexLiteral) call.getOperands().get(1);
+      final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
+      final Granularity.Type type = DruidDateTimeUtils.toDruidGranularity(timeUnit);
+      if (type == null) {
+        // Unknown Granularity bail out
+        return null;
+      }
+      String isoPeriodFormat = DruidDateTimeUtils.toISOPeriodFormat(type);
+      if (isoPeriodFormat == null) {
+        return null;
+      }
+      return DruidExpressions.applyTimestampCeil(
+          druidExpression,
+          isoPeriodFormat,
+          "",
+          TimeZone.getTimeZone(query.getConnectionConfig().timeZone()));
+    } else {
+      return null;
+    }
+  }
+}
+
+// End CeilOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
index 015edff..28f99da 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.adapter.druid;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 
@@ -29,17 +30,43 @@ import java.io.IOException;
 public class DefaultDimensionSpec implements DimensionSpec {
 
   private final String dimension;
+  private final String outputName;
+  private final DruidType outputType;
+
+  public DefaultDimensionSpec(String dimension, String outputName, DruidType outputType) {
+    this.dimension = Preconditions.checkNotNull(dimension);
+    this.outputName = Preconditions.checkNotNull(outputName);
+    this.outputType = outputType == null ? DruidType.STRING : outputType;
+  }
 
   public DefaultDimensionSpec(String dimension) {
-    this.dimension = dimension;
+    this(dimension, dimension, null);
   }
 
   @Override public void write(JsonGenerator generator) throws IOException {
     generator.writeStartObject();
     generator.writeStringField("type", "default");
     generator.writeStringField("dimension", dimension);
+    generator.writeStringField("outputName", outputName);
+    generator.writeStringField("outputType", outputType.name());
     generator.writeEndObject();
   }
+
+  @Override public String getOutputName() {
+    return outputName;
+  }
+
+  @Override public DruidType getOutputType() {
+    return outputType;
+  }
+
+  @Override public ExtractionFunction getExtractionFn() {
+    return null;
+  }
+
+  @Override public String getDimension() {
+    return dimension;
+  }
 }
 
 // End DefaultDimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
index 45625c3..14c02e6 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
@@ -16,12 +16,18 @@
  */
 package org.apache.calcite.adapter.druid;
 
+import javax.annotation.Nullable;
+
 /**
  * Interface for Druid DimensionSpec.
  *
  * <p>DimensionSpecs define how dimension values get transformed prior to aggregation.
  */
-public interface DimensionSpec extends DruidQuery.Json {
+public interface DimensionSpec extends DruidJson {
+  String getOutputName();
+  DruidType getOutputType();
+  @Nullable ExtractionFunction getExtractionFn();
+  String getDimension();
 }
 
 // End DimensionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
new file mode 100644
index 0000000..c937e83
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import java.util.List;
+
+/**
+ * Direct operator conversion for expression like Function(exp_1,...exp_n)
+ */
+public class DirectOperatorConversion implements DruidSqlOperatorConverter {
+  private final SqlOperator operator;
+  private final String druidFunctionName;
+
+  public DirectOperatorConversion(final SqlOperator operator, final String druidFunctionName) {
+    this.operator = operator;
+    this.druidFunctionName = druidFunctionName;
+  }
+
+  @Override public SqlOperator calciteOperator() {
+    return operator;
+  }
+
+  @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    final RexCall call = (RexCall) rexNode;
+    final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+        druidQuery, rowType,
+        call.getOperands());
+    if (druidExpressions == null) {
+      return null;
+    }
+    return DruidExpressions.functionCall(druidFunctionName, druidExpressions);
+  }
+}
+
+// End DirectOperatorConversion.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 4f65dff..40883bf 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.CollectionType;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
@@ -72,12 +73,15 @@ class DruidConnectionImpl implements DruidConnection {
 
   public static final String DEFAULT_RESPONSE_TIMESTAMP_COLUMN = "timestamp";
   private static final SimpleDateFormat UTC_TIMESTAMP_FORMAT;
+  private static final SimpleDateFormat TIMESTAMP_FORMAT;
 
   static {
     final TimeZone utc = DateTimeUtils.UTC_ZONE;
     UTC_TIMESTAMP_FORMAT =
         new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ROOT);
     UTC_TIMESTAMP_FORMAT.setTimeZone(utc);
+    TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT);
+    TIMESTAMP_FORMAT.setTimeZone(utc);
   }
 
   DruidConnectionImpl(String url, String coordinatorUrl) {
@@ -132,6 +136,10 @@ class DruidConnectionImpl implements DruidConnection {
 
     int posTimestampField = -1;
     for (int i = 0; i < fieldTypes.size(); i++) {
+      /*@TODO This need to be revisited. The logic seems implying that only
+      one column of type timestamp is present, this is not necessarily true,
+      see https://issues.apache.org/jira/browse/CALCITE-2175
+      */
       if (fieldTypes.get(i) == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) {
         posTimestampField = i;
         break;
@@ -324,30 +332,41 @@ class DruidConnectionImpl implements DruidConnection {
     }
 
     if (isTimestampColumn || ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP == type) {
-      try {
-        final long timeInMillis;
-
-        if (token == JsonToken.VALUE_NUMBER_INT) {
-          timeInMillis = parser.getLongValue();
-        } else {
+      final int fieldPos = posTimestampField != -1 ? posTimestampField : i;
+      if (token == JsonToken.VALUE_NUMBER_INT) {
+        rowBuilder.set(posTimestampField, parser.getLongValue());
+        return;
+      } else {
+        // We don't have any way to figure out the format of time upfront since we only have
+        // org.apache.calcite.avatica.ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP as type to represent
+        // both timestamp and timestamp with local timezone.
+        // Logic where type is inferred can be found at DruidQuery.DruidQueryNode.getPrimitive()
+        // Thus need to guess via try and catch
+        synchronized (UTC_TIMESTAMP_FORMAT) {
           // synchronized block to avoid race condition
-          synchronized (UTC_TIMESTAMP_FORMAT) {
-            timeInMillis = UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime();
+          try {
+            //First try to pars as Timestamp with timezone.
+            rowBuilder
+                .set(fieldPos, UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime());
+          } catch (ParseException e) {
+            // swallow the exception and try timestamp format
+            try {
+              rowBuilder
+                  .set(fieldPos, TIMESTAMP_FORMAT.parse(parser.getText()).getTime());
+            } catch (ParseException e2) {
+              // unknown format should not happen
+              Throwables.propagate(e2);
+            }
           }
         }
-        if (posTimestampField != -1) {
-          rowBuilder.set(posTimestampField, timeInMillis);
-        }
-      } catch (ParseException e) {
-        // ignore bad value
+        return;
       }
-      return;
     }
 
     switch (token) {
     case VALUE_NUMBER_INT:
       if (type == null) {
-        type = ColumnMetaData.Rep.INTEGER;
+        type = ColumnMetaData.Rep.LONG;
       }
       // fall through
     case VALUE_NUMBER_FLOAT:

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 2a9851a..91f5fa4 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Range;
 import com.google.common.collect.TreeRangeSet;
 
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.joda.time.chrono.ISOChronology;
 import org.slf4j.Logger;
 
@@ -46,6 +47,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
 
+import javax.annotation.Nullable;
+
 /**
  * Utilities for generating intervals from RexNode.
  */
@@ -62,6 +65,7 @@ public class DruidDateTimeUtils {
    * expression. Assumes that all the predicates in the input
    * reference a single column: the timestamp column.
    */
+  @Nullable
   public static List<Interval> createInterval(RexNode e, String timeZone) {
     final List<Range<TimestampString>> ranges =
         extractRanges(e, TimeZone.getTimeZone(timeZone), false);
@@ -111,6 +115,7 @@ public class DruidDateTimeUtils {
     return intervals;
   }
 
+  @Nullable
   protected static List<Range<TimestampString>> extractRanges(RexNode node,
       TimeZone timeZone, boolean withNot) {
     switch (node.getKind()) {
@@ -171,6 +176,7 @@ public class DruidDateTimeUtils {
     }
   }
 
+  @Nullable
   protected static List<Range<TimestampString>> leafToRanges(RexCall call,
       TimeZone timeZone, boolean withNot) {
     switch (call.getKind()) {
@@ -249,6 +255,7 @@ public class DruidDateTimeUtils {
     }
   }
 
+  @Nullable
   protected static TimestampString literalValue(RexNode node, TimeZone timeZone) {
     switch (node.getKind()) {
     case LITERAL:
@@ -318,6 +325,67 @@ public class DruidDateTimeUtils {
     return Granularities.createGranularity(timeUnit, timeZone);
   }
 
+  /**
+   * @param type Druid Granularity  to translate as period of time
+   *
+   * @return String representing the granularity as ISO8601 Period of Time, null for unknown case.
+   */
+  @Nullable
+  public static String toISOPeriodFormat(Granularity.Type type) {
+    switch (type) {
+    case SECOND:
+      return Period.seconds(1).toString();
+    case MINUTE:
+      return Period.minutes(1).toString();
+    case HOUR:
+      return Period.hours(1).toString();
+    case DAY:
+      return Period.days(1).toString();
+    case WEEK:
+      return Period.weeks(1).toString();
+    case MONTH:
+      return Period.months(1).toString();
+    case QUARTER:
+      return Period.months(3).toString();
+    case YEAR:
+      return Period.years(1).toString();
+    default:
+      return null;
+    }
+  }
+
+  /**
+   * Translates Calcite TimeUnitRange to Druid {@link Granularity}
+   * @param timeUnit Calcite Time unit to convert
+   *
+   * @return Druid Granularity or null
+   */
+  @Nullable
+  public static Granularity.Type toDruidGranularity(TimeUnitRange timeUnit) {
+    if (timeUnit == null) {
+      return null;
+    }
+    switch (timeUnit) {
+    case YEAR:
+      return Granularity.Type.YEAR;
+    case QUARTER:
+      return Granularity.Type.QUARTER;
+    case MONTH:
+      return Granularity.Type.MONTH;
+    case WEEK:
+      return Granularity.Type.WEEK;
+    case DAY:
+      return Granularity.Type.DAY;
+    case HOUR:
+      return Granularity.Type.HOUR;
+    case MINUTE:
+      return Granularity.Type.MINUTE;
+    case SECOND:
+      return Granularity.Type.SECOND;
+    default:
+      return null;
+    }
+  }
 }
 
 // End DruidDateTimeUtils.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
new file mode 100644
index 0000000..78cfb0c
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Chars;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+/**
+ * Expression utility class to transform Calcite expressions to Druid expressions when possible.
+ */
+public class DruidExpressions {
+
+  /**
+   * Type mapping between Calcite SQL family types and native Druid expression types
+   */
+  static final Map<SqlTypeName, DruidType> EXPRESSION_TYPES;
+  /**
+   * Druid expression safe chars, must be sorted.
+   */
+  private static final char[] SAFE_CHARS = " ,._-;:(){}[]<>!@#$%^&*`~?/".toCharArray();
+
+  static {
+    final ImmutableMap.Builder<SqlTypeName, DruidType> builder = ImmutableMap.builder();
+
+    for (SqlTypeName type : SqlTypeName.FRACTIONAL_TYPES) {
+      builder.put(type, DruidType.DOUBLE);
+    }
+
+    for (SqlTypeName type : SqlTypeName.INT_TYPES) {
+      builder.put(type, DruidType.LONG);
+    }
+
+    for (SqlTypeName type : SqlTypeName.STRING_TYPES) {
+      builder.put(type, DruidType.STRING);
+    }
+    // Timestamps are treated as longs (millis since the epoch) in Druid expressions.
+    builder.put(SqlTypeName.TIMESTAMP, DruidType.LONG);
+    builder.put(SqlTypeName.DATE, DruidType.LONG);
+    builder.put(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, DruidType.LONG);
+    builder.put(SqlTypeName.OTHER, DruidType.COMPLEX);
+    EXPRESSION_TYPES = builder.build();
+    // Safe chars must be sorted
+    Arrays.sort(SAFE_CHARS);
+  }
+  private DruidExpressions() {
+  }
+
+
+  /**
+   * Translates Calcite rexNode to Druid Expression when possible
+   * @param rexNode rexNode to convert to a Druid Expression
+   * @param inputRowType input row type of the rexNode to translate
+   * @param druidRel Druid query
+   *
+   * @return Druid Expression or null when can not convert the RexNode
+   */
+  @Nullable
+  public static String toDruidExpression(
+      final RexNode rexNode,
+      final RelDataType inputRowType,
+      final DruidQuery druidRel) {
+    SqlKind kind = rexNode.getKind();
+    SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
+
+    if (kind == SqlKind.INPUT_REF) {
+      final RexInputRef ref = (RexInputRef) rexNode;
+      final String columnName = inputRowType.getFieldNames().get(ref.getIndex());
+      if (columnName == null) {
+        return null;
+      }
+      if (druidRel.getDruidTable().timestampFieldName.equals(columnName)) {
+        return DruidExpressions.fromColumn(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+      }
+      return DruidExpressions.fromColumn(columnName);
+    }
+
+    if (rexNode instanceof RexCall) {
+      final SqlOperator operator = ((RexCall) rexNode).getOperator();
+      final DruidSqlOperatorConverter conversion = druidRel.getOperatorConversionMap()
+          .get(operator);
+      if (conversion == null) {
+        //unknown operator can not translate
+        return null;
+      } else {
+        return conversion.toDruidExpression(rexNode, inputRowType, druidRel);
+      }
+    }
+    if (kind == SqlKind.LITERAL) {
+      // Translate literal.
+      if (RexLiteral.isNullLiteral(rexNode)) {
+        //case the filter/project might yield to unknown let Calcite deal with this for now
+        return null;
+      } else if (SqlTypeName.NUMERIC_TYPES.contains(sqlTypeName)) {
+        return DruidExpressions.numberLiteral((Number) RexLiteral
+            .value(rexNode));
+      } else if (SqlTypeFamily.INTERVAL_DAY_TIME == sqlTypeName.getFamily()) {
+        // Calcite represents DAY-TIME intervals in milliseconds.
+        final long milliseconds = ((Number) RexLiteral.value(rexNode)).longValue();
+        return DruidExpressions.numberLiteral(milliseconds);
+      } else if (SqlTypeFamily.INTERVAL_YEAR_MONTH == sqlTypeName.getFamily()) {
+        // Calcite represents YEAR-MONTH intervals in months.
+        final long months = ((Number) RexLiteral.value(rexNode)).longValue();
+        return DruidExpressions.numberLiteral(months);
+      } else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) {
+        return
+            DruidExpressions.stringLiteral(RexLiteral.stringValue(rexNode));
+      } else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName
+          || SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE == sqlTypeName) {
+        return DruidExpressions.numberLiteral(DruidDateTimeUtils
+            .literalValue(rexNode, TimeZone.getTimeZone(druidRel.getConnectionConfig().timeZone()))
+            .getMillisSinceEpoch());
+      } else if (SqlTypeName.BOOLEAN == sqlTypeName) {
+        return DruidExpressions.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0);
+      }
+    }
+    // Not Literal/InputRef/RexCall or unknown type?
+    return null;
+  }
+
+  public static String fromColumn(String columnName) {
+    return DruidQuery.format("\"%s\"", columnName);
+  }
+
+  public static String nullLiteral() {
+    return "null";
+  }
+
+  public static String numberLiteral(final Number n) {
+    return n == null ? nullLiteral() : n.toString();
+  }
+
+  public static String stringLiteral(final String s) {
+    return s == null ? nullLiteral() : "'" + escape(s) + "'";
+  }
+
+  private static String escape(final String s) {
+    final StringBuilder escaped = new StringBuilder();
+    for (int i = 0; i < s.length(); i++) {
+      final char c = s.charAt(i);
+      if (Character.isLetterOrDigit(c) || Arrays.binarySearch(SAFE_CHARS, c) >= 0) {
+        escaped.append(c);
+      } else {
+        escaped.append("\\u").append(BaseEncoding.base16().encode(Chars.toByteArray(c)));
+      }
+    }
+    return escaped.toString();
+  }
+
+  public static String functionCall(final String functionName, final List<String> args) {
+    Preconditions.checkNotNull(functionName, "druid functionName");
+    Preconditions.checkNotNull(args, "args");
+
+    final StringBuilder builder = new StringBuilder(functionName);
+    builder.append("(");
+    for (int i = 0; i < args.size(); i++) {
+      final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i);
+      builder.append(arg);
+      if (i < args.size() - 1) {
+        builder.append(",");
+      }
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+
+  public static String nAryOperatorCall(final String druidOperator, final List<String> args) {
+    Preconditions.checkNotNull(druidOperator, "druid operator missing");
+    Preconditions.checkNotNull(args, "args");
+    final StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    for (int i = 0; i < args.size(); i++) {
+      final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i);
+      builder.append(arg);
+      if (i < args.size() - 1) {
+        builder.append(druidOperator);
+      }
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+
+  /**
+   * Translate a list of Calcite {@code RexNode} to Druid expressions.
+   *
+   * @param rexNodes list of Calcite expressions meant to be applied on top of the rows
+   *
+   * @return list of Druid expressions in the same order as rexNodes, or null if not possible.
+   * If a non-null list is returned, all elements will be non-null.
+   */
+  @Nullable
+  public static List<String> toDruidExpressions(
+      final DruidQuery druidRel, final RelDataType rowType,
+      final List<RexNode> rexNodes) {
+    final List<String> retVal = new ArrayList<>(rexNodes.size());
+    for (RexNode rexNode : rexNodes) {
+      final String druidExpression = toDruidExpression(rexNode, rowType, druidRel);
+      if (druidExpression == null) {
+        return null;
+      }
+
+      retVal.add(druidExpression);
+    }
+    return retVal;
+  }
+
+  public static String applyTimestampFloor(
+      final String input,
+      final String granularity,
+      final String origin,
+      final TimeZone timeZone) {
+    Preconditions.checkNotNull(input, "input");
+    Preconditions.checkNotNull(granularity, "granularity");
+    return DruidExpressions.functionCall(
+        "timestamp_floor",
+        ImmutableList.of(input,
+            DruidExpressions.stringLiteral(granularity),
+            DruidExpressions.stringLiteral(origin),
+            DruidExpressions.stringLiteral(timeZone.getID())));
+  }
+
+  public static String applyTimestampCeil(
+      final String input,
+      final String granularity,
+      final String origin,
+      final TimeZone timeZone) {
+    Preconditions.checkNotNull(input, "input");
+    Preconditions.checkNotNull(granularity, "granularity");
+    return DruidExpressions.functionCall(
+        "timestamp_ceil",
+        ImmutableList.of(input,
+            DruidExpressions.stringLiteral(granularity),
+            DruidExpressions.stringLiteral(origin),
+            DruidExpressions.stringLiteral(timeZone.getID())));
+  }
+
+
+  public static String applyTimeExtract(String timeExpression, String druidUnit,
+      TimeZone timeZone) {
+    return DruidExpressions.functionCall(
+        "timestamp_extract",
+        ImmutableList.of(
+            timeExpression,
+            DruidExpressions.stringLiteral(druidUnit),
+            DruidExpressions.stringLiteral(timeZone.getID())));
+  }
+}
+
+// End DruidExpressions.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
new file mode 100644
index 0000000..77ccf4f
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+
+/** Object that knows how to write itself to a
+ * {@link com.fasterxml.jackson.core.JsonGenerator}. */
+public interface DruidJson {
+  void write(JsonGenerator generator) throws IOException;
+}
+
+// End DruidJson.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
new file mode 100644
index 0000000..11ec2be
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimestampString;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Filter element of a Druid "groupBy" or "topN" query.
+ */
+abstract class DruidJsonFilter implements DruidJson {
+
+  /**
+   * @param rexNode    rexNode to translate to Druid Json Filter
+   * @param rowType    rowType associated to rexNode
+   * @param druidQuery druid query
+   *
+   * @return Druid Json filter or null if it can not translate
+   */
+  @Nullable
+  private static DruidJsonFilter toEqualityKindDruidFilter(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    if (rexNode.getKind() != SqlKind.EQUALS && rexNode.getKind() != SqlKind.NOT_EQUALS) {
+      throw new AssertionError(
+          DruidQuery.format("Expecting EQUALS or NOT_EQUALS but got [%s]", rexNode.getKind()));
+    }
+    final RexCall rexCall = (RexCall) rexNode;
+    if (rexCall.getOperands().size() < 2) {
+      return null;
+    }
+    final RexLiteral rexLiteral;
+    final RexNode refNode;
+    final RexNode lhs = rexCall.getOperands().get(0);
+    final RexNode rhs = rexCall.getOperands().get(1);
+    if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) {
+      rexLiteral = (RexLiteral) lhs;
+      refNode = rhs;
+    } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) {
+      rexLiteral = (RexLiteral) rhs;
+      refNode = lhs;
+    } else {
+      // must have at least one literal
+      return null;
+    }
+
+    if (RexLiteral.isNullLiteral(rexLiteral)) {
+      // we are not handling is NULL filter here thus we bail out if Literal is null
+      return null;
+    }
+    final String literalValue = toDruidLiteral(rexLiteral, rowType, druidQuery);
+    if (literalValue == null) {
+      // can not translate literal better bail out
+      return null;
+    }
+    final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC
+        || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC;
+    final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType,
+        druidQuery);
+    final String columnName = druidColumn.left;
+    final ExtractionFunction extractionFunction = druidColumn.right;
+    if (columnName == null) {
+      // no column name better bail out.
+      return null;
+    }
+    final DruidJsonFilter partialFilter;
+    if (isNumeric) {
+      //need bound filter since it one of operands is numeric
+      partialFilter = new JsonBound(columnName, literalValue, false, literalValue, false, true,
+          extractionFunction);
+    } else {
+      partialFilter = new JsonSelector(columnName, literalValue, extractionFunction);
+    }
+
+    if (rexNode.getKind() == SqlKind.EQUALS) {
+      return partialFilter;
+    }
+    return toNotDruidFilter(partialFilter);
+  }
+
+
+  /**
+   * @param rexNode    rexNode to translate
+   * @param rowType    row type associated to Filter
+   * @param druidQuery druid query
+   *
+   * @return valid Druid Json Bound Filter or null if it can not translate the rexNode.
+   */
+  @Nullable
+  private static DruidJsonFilter toBoundDruidFilter(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    final RexCall rexCall = (RexCall) rexNode;
+    final RexLiteral rexLiteral;
+    if (rexCall.getOperands().size() < 2) {
+      return null;
+    }
+    final RexNode refNode;
+    final RexNode lhs = rexCall.getOperands().get(0);
+    final RexNode rhs = rexCall.getOperands().get(1);
+    final boolean lhsIsRef;
+    if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) {
+      rexLiteral = (RexLiteral) lhs;
+      refNode = rhs;
+      lhsIsRef = false;
+    } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) {
+      rexLiteral = (RexLiteral) rhs;
+      refNode = lhs;
+      lhsIsRef = true;
+    } else {
+      // must have at least one literal
+      return null;
+    }
+
+    if (RexLiteral.isNullLiteral(rexLiteral)) {
+      // we are not handling is NULL filter here thus we bail out if Literal is null
+      return null;
+    }
+    final String literalValue = DruidJsonFilter.toDruidLiteral(rexLiteral, rowType, druidQuery);
+    if (literalValue == null) {
+      // can not translate literal better bail out
+      return null;
+    }
+    final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC
+        || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC;
+    final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType,
+        druidQuery);
+    final String columnName = druidColumn.left;
+    final ExtractionFunction extractionFunction = druidColumn.right;
+    if (columnName == null) {
+      // no column name better bail out.
+      return null;
+    }
+    switch (rexCall.getKind()) {
+    case LESS_THAN_OR_EQUAL:
+    case LESS_THAN:
+      if (lhsIsRef) {
+        return new JsonBound(columnName, null, false, literalValue,
+            rexCall.getKind() == SqlKind.LESS_THAN, isNumeric,
+            extractionFunction);
+      } else {
+        return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.LESS_THAN, null,
+            false, isNumeric,
+            extractionFunction);
+      }
+    case GREATER_THAN_OR_EQUAL:
+    case GREATER_THAN:
+      if (!lhsIsRef) {
+        return new JsonBound(columnName, null, false, literalValue,
+            rexCall.getKind() == SqlKind.GREATER_THAN, isNumeric,
+            extractionFunction);
+      } else {
+        return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.GREATER_THAN,
+            null,
+            false, isNumeric,
+            extractionFunction);
+      }
+    default:
+      return null;
+    }
+
+  }
+
+  /**
+   * @param rexNode    rexNode to translate to Druid literal equivalante
+   * @param rowType    rowType associated to rexNode
+   * @param druidQuery druid Query
+   *
+   * @return non null string or null if it can not translate to valid Druid equivalent
+   */
+  @Nullable
+  private static String toDruidLiteral(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    final SimpleDateFormat dateFormatter = new SimpleDateFormat(
+        TimeExtractionFunction.ISO_TIME_FORMAT,
+        Locale.ROOT);
+    final String timeZone = druidQuery.getConnectionConfig().timeZone();
+    if (timeZone != null) {
+      dateFormatter.setTimeZone(TimeZone.getTimeZone(timeZone));
+    }
+    final String val;
+    final RexLiteral rhsLiteral = (RexLiteral) rexNode;
+    if (SqlTypeName.NUMERIC_TYPES.contains(rhsLiteral.getTypeName())) {
+      val = String.valueOf(RexLiteral.value(rhsLiteral));
+    } else if (SqlTypeName.CHAR_TYPES.contains(rhsLiteral.getTypeName())) {
+      val = String.valueOf(RexLiteral.stringValue(rhsLiteral));
+    } else if (SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName() || SqlTypeName.DATE == rhsLiteral
+        .getTypeName() || SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE == rhsLiteral.getTypeName()) {
+      TimestampString timestampString = DruidDateTimeUtils
+          .literalValue(rexNode, TimeZone.getTimeZone(timeZone));
+      if (timestampString == null) {
+        throw new AssertionError(
+            "Cannot translate Literal" + rexNode + " of type "
+                + rhsLiteral.getTypeName() + " to TimestampString");
+      }
+      //@TODO this is unnecessary we can send time as Long (eg millis since epoch) to druid
+      val = dateFormatter.format(timestampString.getMillisSinceEpoch());
+    } else {
+      // Don't know how to filter on this kind of literal.
+      val = null;
+    }
+    return val;
+  }
+
+  @Nullable
+  private static DruidJsonFilter toIsNullKindDruidFilter(RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    if (rexNode.getKind() != SqlKind.IS_NULL && rexNode.getKind() != SqlKind.IS_NOT_NULL) {
+      throw new AssertionError(
+          DruidQuery.format("Expecting IS_NULL or IS_NOT_NULL but got [%s]", rexNode.getKind()));
+    }
+    final RexCall rexCall = (RexCall) rexNode;
+    final RexNode refNode = rexCall.getOperands().get(0);
+    Pair<String, ExtractionFunction> druidColumn = DruidQuery
+        .toDruidColumn(refNode, rowType, druidQuery);
+    final String columnName = druidColumn.left;
+    final ExtractionFunction extractionFunction = druidColumn.right;
+    if (columnName == null) {
+      return null;
+    }
+    if (rexNode.getKind() == SqlKind.IS_NOT_NULL) {
+      return toNotDruidFilter(new JsonSelector(columnName, null, extractionFunction));
+    }
+    return new JsonSelector(columnName, null, extractionFunction);
+  }
+
+  @Nullable
+  private static DruidJsonFilter toInKindDruidFilter(RexNode e, RelDataType rowType,
+      DruidQuery druidQuery) {
+    if (e.getKind() != SqlKind.IN && e.getKind() != SqlKind.NOT_IN) {
+      throw new AssertionError(
+          DruidQuery.format("Expecting IN or NOT IN but got [%s]", e.getKind()));
+    }
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (RexNode rexNode : ((RexCall) e).getOperands()) {
+      if (rexNode.getKind() == SqlKind.LITERAL) {
+        String value = toDruidLiteral(rexNode, rowType, druidQuery);
+        if (value == null) {
+          return null;
+        }
+        listBuilder.add(value);
+      }
+    }
+    Pair<String, ExtractionFunction> druidColumn = DruidQuery
+        .toDruidColumn(((RexCall) e).getOperands().get(0),
+        rowType, druidQuery);
+    final String columnName = druidColumn.left;
+    final ExtractionFunction extractionFunction = druidColumn.right;
+    if (columnName == null) {
+      return null;
+    }
+    if (e.getKind() != SqlKind.NOT_IN) {
+      return new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction);
+    } else {
+      return toNotDruidFilter(
+          new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction));
+    }
+  }
+
+  @Nullable
+  protected static DruidJsonFilter toNotDruidFilter(DruidJsonFilter druidJsonFilter) {
+    if (druidJsonFilter == null) {
+      return null;
+    }
+    return new JsonCompositeFilter(Type.NOT, druidJsonFilter);
+  }
+
+  @Nullable
+  private static DruidJsonFilter toBetweenDruidFilter(RexNode rexNode, RelDataType rowType,
+      DruidQuery query) {
+    if (rexNode.getKind() != SqlKind.BETWEEN) {
+      return null;
+    }
+    final RexCall rexCall = (RexCall) rexNode;
+    if (rexCall.getOperands().size() < 4) {
+      return null;
+    }
+    // BETWEEN (ASYMMETRIC, REF, 'lower-bound', 'upper-bound')
+    final RexNode refNode = rexCall.getOperands().get(1);
+    final RexNode lhs = rexCall.getOperands().get(2);
+    final RexNode rhs = rexCall.getOperands().get(3);
+
+    final String lhsLiteralValue = toDruidLiteral(lhs, rowType, query);
+    final String rhsLiteralValue = toDruidLiteral(rhs, rowType, query);
+    if (lhsLiteralValue == null || rhsLiteralValue == null) {
+      return null;
+    }
+    final boolean isNumeric = lhs.getType().getFamily() == SqlTypeFamily.NUMERIC
+        || lhs.getType().getFamily() == SqlTypeFamily.NUMERIC;
+    final Pair<String, ExtractionFunction> druidColumn = DruidQuery
+        .toDruidColumn(refNode, rowType, query);
+    final String columnName = druidColumn.left;
+    final ExtractionFunction extractionFunction = druidColumn.right;
+
+    if (columnName == null) {
+      return null;
+    }
+    return new JsonBound(columnName, lhsLiteralValue, false, rhsLiteralValue,
+        false, isNumeric,
+        extractionFunction);
+
+  }
+
+  @Nullable
+  private static DruidJsonFilter toSimpleDruidFilter(RexNode e, RelDataType rowType,
+      DruidQuery druidQuery) {
+    switch (e.getKind()) {
+    case EQUALS:
+    case NOT_EQUALS:
+      return toEqualityKindDruidFilter(e, rowType, druidQuery);
+    case GREATER_THAN:
+    case GREATER_THAN_OR_EQUAL:
+    case LESS_THAN:
+    case LESS_THAN_OR_EQUAL:
+      return toBoundDruidFilter(e, rowType, druidQuery);
+    case BETWEEN:
+      return toBetweenDruidFilter(e, rowType, druidQuery);
+    case IN:
+    case NOT_IN:
+      return toInKindDruidFilter(e, rowType, druidQuery);
+    case IS_NULL:
+    case IS_NOT_NULL:
+      return toIsNullKindDruidFilter(e, rowType, druidQuery);
+    default:
+      return null;
+    }
+  }
+
+  /**
+   * @param rexNode    rexNode to translate to Druid Filter
+   * @param rowType    rowType of filter input
+   * @param druidQuery Druid query
+   *
+   * @return Druid Json Filters or null when can not translate to valid Druid Filters.
+   */
+  @Nullable
+  static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType,
+      DruidQuery druidQuery) {
+    if (rexNode.isAlwaysTrue()) {
+      return JsonExpressionFilter.alwaysTrue();
+    }
+    if (rexNode.isAlwaysFalse()) {
+      return JsonExpressionFilter.alwaysFalse();
+    }
+    switch (rexNode.getKind()) {
+    case IS_TRUE:
+    case IS_NOT_FALSE:
+      return toDruidFilters(Iterables.getOnlyElement(((RexCall) rexNode).getOperands()), rowType,
+          druidQuery);
+    case IS_NOT_TRUE:
+    case IS_FALSE:
+      final DruidJsonFilter simpleFilter = toDruidFilters(Iterables
+          .getOnlyElement(((RexCall) rexNode).getOperands()), rowType, druidQuery);
+      return simpleFilter != null ? new JsonCompositeFilter(Type.NOT, simpleFilter)
+          : simpleFilter;
+    case AND:
+    case OR:
+    case NOT:
+      final RexCall call = (RexCall) rexNode;
+      final List<DruidJsonFilter> jsonFilters = Lists.newArrayList();
+      for (final RexNode e : call.getOperands()) {
+        final DruidJsonFilter druidFilter = toDruidFilters(e, rowType, druidQuery);
+        if (druidFilter == null) {
+          return null;
+        }
+        jsonFilters.add(druidFilter);
+      }
+      return new JsonCompositeFilter(Type.valueOf(rexNode.getKind().name()),
+          jsonFilters);
+    }
+
+    final DruidJsonFilter simpleLeafFilter = toSimpleDruidFilter(rexNode, rowType, druidQuery);
+    return simpleLeafFilter == null
+        ? toDruidExpressionFilter(rexNode, rowType, druidQuery)
+        : simpleLeafFilter;
+  }
+
+  @Nullable
+  private static DruidJsonFilter toDruidExpressionFilter(RexNode rexNode, RelDataType rowType,
+      DruidQuery query) {
+    final String expression = DruidExpressions.toDruidExpression(rexNode, rowType, query);
+    return expression == null ? null : new JsonExpressionFilter(expression);
+  }
+
+  /**
+   * Supported filter types
+   */
+  protected enum Type {
+    AND,
+    OR,
+    NOT,
+    SELECTOR,
+    IN,
+    BOUND,
+    EXPRESSION;
+
+    public String lowercase() {
+      return name().toLowerCase(Locale.ROOT);
+    }
+  }
+
+  protected final Type type;
+
+  private DruidJsonFilter(Type type) {
+    this.type = type;
+  }
+
+  /**
+   * Druid Expression filter.
+   */
+  public static class JsonExpressionFilter extends DruidJsonFilter {
+    private final String expression;
+
+    JsonExpressionFilter(String expression) {
+      super(Type.EXPRESSION);
+      this.expression = Preconditions.checkNotNull(expression);
+    }
+
+    @Override public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type.lowercase());
+      generator.writeStringField("expression", expression);
+      generator.writeEndObject();
+    }
+
+    /**
+     * We need to push to Druid an expression that always evaluates to true.
+     */
+    private static JsonExpressionFilter alwaysTrue() {
+      return new JsonExpressionFilter("1 == 1");
+    }
+
+    /**
+     * We need to push to Druid an expression that always evaluates to false.
+     */
+    private static JsonExpressionFilter alwaysFalse() {
+      return new JsonExpressionFilter("1 == 2");
+    }
+  }
+
+  /**
+   * Equality filter.
+   */
+  private static class JsonSelector extends DruidJsonFilter {
+    private final String dimension;
+
+    private final String value;
+
+    private final ExtractionFunction extractionFunction;
+
+    private JsonSelector(String dimension, String value,
+        ExtractionFunction extractionFunction) {
+      super(Type.SELECTOR);
+      this.dimension = dimension;
+      this.value = value;
+      this.extractionFunction = extractionFunction;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type.lowercase());
+      generator.writeStringField("dimension", dimension);
+      generator.writeStringField("value", value);
+      DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+      generator.writeEndObject();
+    }
+  }
+
+  /**
+   * Bound filter.
+   */
+  @VisibleForTesting
+  protected static class JsonBound extends DruidJsonFilter {
+    private final String dimension;
+
+    private final String lower;
+
+    private final boolean lowerStrict;
+
+    private final String upper;
+
+    private final boolean upperStrict;
+
+    private final boolean alphaNumeric;
+
+    private final ExtractionFunction extractionFunction;
+
+    protected JsonBound(String dimension, String lower,
+        boolean lowerStrict, String upper, boolean upperStrict,
+        boolean alphaNumeric, ExtractionFunction extractionFunction) {
+      super(Type.BOUND);
+      this.dimension = dimension;
+      this.lower = lower;
+      this.lowerStrict = lowerStrict;
+      this.upper = upper;
+      this.upperStrict = upperStrict;
+      this.alphaNumeric = alphaNumeric;
+      this.extractionFunction = extractionFunction;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type.lowercase());
+      generator.writeStringField("dimension", dimension);
+      if (lower != null) {
+        generator.writeStringField("lower", lower);
+        generator.writeBooleanField("lowerStrict", lowerStrict);
+      }
+      if (upper != null) {
+        generator.writeStringField("upper", upper);
+        generator.writeBooleanField("upperStrict", upperStrict);
+      }
+      if (alphaNumeric) {
+        generator.writeStringField("ordering", "numeric");
+      } else {
+        generator.writeStringField("ordering", "lexicographic");
+      }
+      DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+      generator.writeEndObject();
+    }
+  }
+
+  /**
+   * Filter that combines other filters using a boolean operator.
+   */
+  private static class JsonCompositeFilter extends DruidJsonFilter {
+    private final List<? extends DruidJsonFilter> fields;
+
+    private JsonCompositeFilter(Type type,
+        Iterable<? extends DruidJsonFilter> fields) {
+      super(type);
+      this.fields = ImmutableList.copyOf(fields);
+    }
+
+    private JsonCompositeFilter(Type type, DruidJsonFilter... fields) {
+      this(type, ImmutableList.copyOf(fields));
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type.lowercase());
+      switch (type) {
+      case NOT:
+        DruidQuery.writeField(generator, "field", fields.get(0));
+        break;
+      default:
+        DruidQuery.writeField(generator, "fields", fields);
+      }
+      generator.writeEndObject();
+    }
+  }
+
+  /**
+   * IN filter.
+   */
+  protected static class JsonInFilter extends DruidJsonFilter {
+    private final String dimension;
+
+    private final List<String> values;
+
+    private final ExtractionFunction extractionFunction;
+
+    protected JsonInFilter(String dimension, List<String> values,
+        ExtractionFunction extractionFunction) {
+      super(Type.IN);
+      this.dimension = dimension;
+      this.values = values;
+      this.extractionFunction = extractionFunction;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type.lowercase());
+      generator.writeStringField("dimension", dimension);
+      DruidQuery.writeField(generator, "values", values);
+      DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+      generator.writeEndObject();
+    }
+  }
+
+  public static DruidJsonFilter getSelectorFilter(String column, String value,
+      ExtractionFunction extractionFunction) {
+    Preconditions.checkNotNull(column);
+    return new JsonSelector(column, value, extractionFunction);
+  }
+
+  /**
+   * Druid Having Filter spec
+   */
+  protected static class JsonDimHavingFilter implements DruidJson {
+
+    private final DruidJsonFilter filter;
+
+    public JsonDimHavingFilter(DruidJsonFilter filter) {
+      this.filter = filter;
+    }
+
+    @Override public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", "filter");
+      DruidQuery.writeField(generator, "filter", filter);
+      generator.writeEndObject();
+    }
+  }
+}
+
+// End DruidJsonFilter.java