You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2018/02/17 03:28:35 UTC
[4/4] calcite git commit: [CALCITE-2170] Use Druid Expressions
capabilities to improve the amount of work that can be pushed to Druid
[CALCITE-2170] Use Druid Expressions capabilities to improve the amount of work that can be pushed to Druid
Close apache/calcite#624
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/98f3704e
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/98f3704e
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/98f3704e
Branch: refs/heads/master
Commit: 98f3704ea4536d6ead6465376bd02139b889f6e9
Parents: 707f4de
Author: Christian Tzolov <ch...@gmail.com>
Authored: Thu Nov 3 06:30:04 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Fri Feb 16 19:16:58 2018 -0800
----------------------------------------------------------------------
.../adapter/druid/BinaryOperatorConversion.java | 64 +
.../adapter/druid/CeilOperatorConversion.java | 77 +
.../adapter/druid/DefaultDimensionSpec.java | 29 +-
.../calcite/adapter/druid/DimensionSpec.java | 8 +-
.../adapter/druid/DirectOperatorConversion.java | 55 +
.../adapter/druid/DruidConnectionImpl.java | 49 +-
.../adapter/druid/DruidDateTimeUtils.java | 68 +
.../calcite/adapter/druid/DruidExpressions.java | 283 +++
.../apache/calcite/adapter/druid/DruidJson.java | 29 +
.../calcite/adapter/druid/DruidJsonFilter.java | 642 +++++
.../calcite/adapter/druid/DruidQuery.java | 2114 ++++++++--------
.../adapter/druid/DruidResultEnumerator.java | 25 -
.../calcite/adapter/druid/DruidRules.java | 626 +----
.../adapter/druid/DruidSqlCastConverter.java | 152 ++
.../druid/DruidSqlOperatorConverter.java | 49 +
.../apache/calcite/adapter/druid/DruidType.java | 16 +-
.../druid/ExtractOperatorConversion.java | 80 +
.../adapter/druid/ExtractionDimensionSpec.java | 50 +-
.../adapter/druid/ExtractionFunction.java | 2 +-
.../adapter/druid/FloorOperatorConversion.java | 74 +
.../calcite/adapter/druid/Granularities.java | 4 +-
.../calcite/adapter/druid/Granularity.java | 2 +-
.../adapter/druid/NaryOperatorConverter.java | 60 +
.../druid/SubstringOperatorConversion.java | 63 +
.../druid/TimeExtractionDimensionSpec.java | 75 -
.../adapter/druid/TimeExtractionFunction.java | 63 +-
.../druid/UnaryPrefixOperatorConversion.java | 63 +
.../druid/UnarySuffixOperatorConversion.java | 62 +
.../calcite/adapter/druid/VirtualColumn.java | 100 +
.../adapter/druid/DruidQueryFilterTest.java | 47 +-
.../org/apache/calcite/test/DruidAdapterIT.java | 2367 ++++++++++++------
31 files changed, 4880 insertions(+), 2518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
new file mode 100644
index 0000000..d10c147
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/BinaryOperatorConversion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import java.util.List;
+
+/**
+ * Binary operator conversion utility class used to convert expression like exp1 Operator exp2
+ */
+public class BinaryOperatorConversion implements DruidSqlOperatorConverter {
+ private final SqlOperator operator;
+ private final String druidOperator;
+
+ public BinaryOperatorConversion(final SqlOperator operator, final String druidOperator) {
+ this.operator = operator;
+ this.druidOperator = druidOperator;
+ }
+
+ @Override public SqlOperator calciteOperator() {
+ return operator;
+ }
+
+ @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+
+ final RexCall call = (RexCall) rexNode;
+
+ final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+ druidQuery, rowType,
+ call.getOperands());
+ if (druidExpressions == null) {
+ return null;
+ }
+ if (druidExpressions.size() != 2) {
+ throw new IllegalStateException(
+ DruidQuery.format("Got binary operator[%s] with %s args?", operator.getName(),
+ druidExpressions.size()));
+ }
+
+ return DruidQuery
+ .format("(%s %s %s)", druidExpressions.get(0), druidOperator, druidExpressions.get(1));
+ }
+}
+
+// End BinaryOperatorConversion.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
new file mode 100644
index 0000000..7f15307
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/CeilOperatorConversion.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+/**
+ * DruidSqlOperatorConverter implementation that handles Ceil operations conversions
+ */
+public class CeilOperatorConversion implements DruidSqlOperatorConverter {
+ @Override public SqlOperator calciteOperator() {
+ return SqlStdOperatorTable.CEIL;
+ }
+
+ @Nullable
+ @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+ DruidQuery query) {
+ final RexCall call = (RexCall) rexNode;
+ final RexNode arg = call.getOperands().get(0);
+ final String druidExpression = DruidExpressions.toDruidExpression(
+ arg,
+ rowType,
+ query);
+ if (druidExpression == null) {
+ return null;
+ } else if (call.getOperands().size() == 1) {
+ // case CEIL(expr)
+ return DruidQuery.format("ceil(%s)", druidExpression);
+ } else if (call.getOperands().size() == 2) {
+ // CEIL(expr TO timeUnit)
+ final RexLiteral flag = (RexLiteral) call.getOperands().get(1);
+ final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
+ final Granularity.Type type = DruidDateTimeUtils.toDruidGranularity(timeUnit);
+ if (type == null) {
+ // Unknown Granularity bail out
+ return null;
+ }
+ String isoPeriodFormat = DruidDateTimeUtils.toISOPeriodFormat(type);
+ if (isoPeriodFormat == null) {
+ return null;
+ }
+ return DruidExpressions.applyTimestampCeil(
+ druidExpression,
+ isoPeriodFormat,
+ "",
+ TimeZone.getTimeZone(query.getConnectionConfig().timeZone()));
+ } else {
+ return null;
+ }
+ }
+}
+
+// End CeilOperatorConversion.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
index 015edff..28f99da 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DefaultDimensionSpec.java
@@ -17,6 +17,7 @@
package org.apache.calcite.adapter.druid;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -29,17 +30,43 @@ import java.io.IOException;
public class DefaultDimensionSpec implements DimensionSpec {
private final String dimension;
+ private final String outputName;
+ private final DruidType outputType;
+
+ public DefaultDimensionSpec(String dimension, String outputName, DruidType outputType) {
+ this.dimension = Preconditions.checkNotNull(dimension);
+ this.outputName = Preconditions.checkNotNull(outputName);
+ this.outputType = outputType == null ? DruidType.STRING : outputType;
+ }
public DefaultDimensionSpec(String dimension) {
- this.dimension = dimension;
+ this(dimension, dimension, null);
}
@Override public void write(JsonGenerator generator) throws IOException {
generator.writeStartObject();
generator.writeStringField("type", "default");
generator.writeStringField("dimension", dimension);
+ generator.writeStringField("outputName", outputName);
+ generator.writeStringField("outputType", outputType.name());
generator.writeEndObject();
}
+
+ @Override public String getOutputName() {
+ return outputName;
+ }
+
+ @Override public DruidType getOutputType() {
+ return outputType;
+ }
+
+ @Override public ExtractionFunction getExtractionFn() {
+ return null;
+ }
+
+ @Override public String getDimension() {
+ return dimension;
+ }
}
// End DefaultDimensionSpec.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
index 45625c3..14c02e6 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DimensionSpec.java
@@ -16,12 +16,18 @@
*/
package org.apache.calcite.adapter.druid;
+import javax.annotation.Nullable;
+
/**
* Interface for Druid DimensionSpec.
*
* <p>DimensionSpecs define how dimension values get transformed prior to aggregation.
*/
-public interface DimensionSpec extends DruidQuery.Json {
+public interface DimensionSpec extends DruidJson {
+ String getOutputName();
+ DruidType getOutputType();
+ @Nullable ExtractionFunction getExtractionFn();
+ String getDimension();
}
// End DimensionSpec.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
new file mode 100644
index 0000000..c937e83
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DirectOperatorConversion.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import java.util.List;
+
+/**
+ * Direct operator conversion for expression like Function(exp_1,...exp_n)
+ */
+public class DirectOperatorConversion implements DruidSqlOperatorConverter {
+ private final SqlOperator operator;
+ private final String druidFunctionName;
+
+ public DirectOperatorConversion(final SqlOperator operator, final String druidFunctionName) {
+ this.operator = operator;
+ this.druidFunctionName = druidFunctionName;
+ }
+
+ @Override public SqlOperator calciteOperator() {
+ return operator;
+ }
+
+ @Override public String toDruidExpression(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ final RexCall call = (RexCall) rexNode;
+ final List<String> druidExpressions = DruidExpressions.toDruidExpressions(
+ druidQuery, rowType,
+ call.getOperands());
+ if (druidExpressions == null) {
+ return null;
+ }
+ return DruidExpressions.functionCall(druidFunctionName, druidExpressions);
+ }
+}
+
+// End DirectOperatorConversion.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 4f65dff..40883bf 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -72,12 +73,15 @@ class DruidConnectionImpl implements DruidConnection {
public static final String DEFAULT_RESPONSE_TIMESTAMP_COLUMN = "timestamp";
private static final SimpleDateFormat UTC_TIMESTAMP_FORMAT;
+ private static final SimpleDateFormat TIMESTAMP_FORMAT;
static {
final TimeZone utc = DateTimeUtils.UTC_ZONE;
UTC_TIMESTAMP_FORMAT =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ROOT);
UTC_TIMESTAMP_FORMAT.setTimeZone(utc);
+ TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT);
+ TIMESTAMP_FORMAT.setTimeZone(utc);
}
DruidConnectionImpl(String url, String coordinatorUrl) {
@@ -132,6 +136,10 @@ class DruidConnectionImpl implements DruidConnection {
int posTimestampField = -1;
for (int i = 0; i < fieldTypes.size(); i++) {
+ /*@TODO This need to be revisited. The logic seems implying that only
+ one column of type timestamp is present, this is not necessarily true,
+ see https://issues.apache.org/jira/browse/CALCITE-2175
+ */
if (fieldTypes.get(i) == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) {
posTimestampField = i;
break;
@@ -324,30 +332,41 @@ class DruidConnectionImpl implements DruidConnection {
}
if (isTimestampColumn || ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP == type) {
- try {
- final long timeInMillis;
-
- if (token == JsonToken.VALUE_NUMBER_INT) {
- timeInMillis = parser.getLongValue();
- } else {
+ final int fieldPos = posTimestampField != -1 ? posTimestampField : i;
+ if (token == JsonToken.VALUE_NUMBER_INT) {
+ rowBuilder.set(posTimestampField, parser.getLongValue());
+ return;
+ } else {
+ // We don't have any way to figure out the format of time upfront since we only have
+ // org.apache.calcite.avatica.ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP as type to represent
+ // both timestamp and timestamp with local timezone.
+ // Logic where type is inferred can be found at DruidQuery.DruidQueryNode.getPrimitive()
+ // Thus need to guess via try and catch
+ synchronized (UTC_TIMESTAMP_FORMAT) {
// synchronized block to avoid race condition
- synchronized (UTC_TIMESTAMP_FORMAT) {
- timeInMillis = UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime();
+ try {
+ //First try to pars as Timestamp with timezone.
+ rowBuilder
+ .set(fieldPos, UTC_TIMESTAMP_FORMAT.parse(parser.getText()).getTime());
+ } catch (ParseException e) {
+ // swallow the exception and try timestamp format
+ try {
+ rowBuilder
+ .set(fieldPos, TIMESTAMP_FORMAT.parse(parser.getText()).getTime());
+ } catch (ParseException e2) {
+ // unknown format should not happen
+ Throwables.propagate(e2);
+ }
}
}
- if (posTimestampField != -1) {
- rowBuilder.set(posTimestampField, timeInMillis);
- }
- } catch (ParseException e) {
- // ignore bad value
+ return;
}
- return;
}
switch (token) {
case VALUE_NUMBER_INT:
if (type == null) {
- type = ColumnMetaData.Rep.INTEGER;
+ type = ColumnMetaData.Rep.LONG;
}
// fall through
case VALUE_NUMBER_FLOAT:
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 2a9851a..91f5fa4 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import org.joda.time.Interval;
+import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.slf4j.Logger;
@@ -46,6 +47,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
+import javax.annotation.Nullable;
+
/**
* Utilities for generating intervals from RexNode.
*/
@@ -62,6 +65,7 @@ public class DruidDateTimeUtils {
* expression. Assumes that all the predicates in the input
* reference a single column: the timestamp column.
*/
+ @Nullable
public static List<Interval> createInterval(RexNode e, String timeZone) {
final List<Range<TimestampString>> ranges =
extractRanges(e, TimeZone.getTimeZone(timeZone), false);
@@ -111,6 +115,7 @@ public class DruidDateTimeUtils {
return intervals;
}
+ @Nullable
protected static List<Range<TimestampString>> extractRanges(RexNode node,
TimeZone timeZone, boolean withNot) {
switch (node.getKind()) {
@@ -171,6 +176,7 @@ public class DruidDateTimeUtils {
}
}
+ @Nullable
protected static List<Range<TimestampString>> leafToRanges(RexCall call,
TimeZone timeZone, boolean withNot) {
switch (call.getKind()) {
@@ -249,6 +255,7 @@ public class DruidDateTimeUtils {
}
}
+ @Nullable
protected static TimestampString literalValue(RexNode node, TimeZone timeZone) {
switch (node.getKind()) {
case LITERAL:
@@ -318,6 +325,67 @@ public class DruidDateTimeUtils {
return Granularities.createGranularity(timeUnit, timeZone);
}
+ /**
+ * @param type Druid Granularity to translate as period of time
+ *
+ * @return String representing the granularity as ISO8601 Period of Time, null for unknown case.
+ */
+ @Nullable
+ public static String toISOPeriodFormat(Granularity.Type type) {
+ switch (type) {
+ case SECOND:
+ return Period.seconds(1).toString();
+ case MINUTE:
+ return Period.minutes(1).toString();
+ case HOUR:
+ return Period.hours(1).toString();
+ case DAY:
+ return Period.days(1).toString();
+ case WEEK:
+ return Period.weeks(1).toString();
+ case MONTH:
+ return Period.months(1).toString();
+ case QUARTER:
+ return Period.months(3).toString();
+ case YEAR:
+ return Period.years(1).toString();
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Translates Calcite TimeUnitRange to Druid {@link Granularity}
+ * @param timeUnit Calcite Time unit to convert
+ *
+ * @return Druid Granularity or null
+ */
+ @Nullable
+ public static Granularity.Type toDruidGranularity(TimeUnitRange timeUnit) {
+ if (timeUnit == null) {
+ return null;
+ }
+ switch (timeUnit) {
+ case YEAR:
+ return Granularity.Type.YEAR;
+ case QUARTER:
+ return Granularity.Type.QUARTER;
+ case MONTH:
+ return Granularity.Type.MONTH;
+ case WEEK:
+ return Granularity.Type.WEEK;
+ case DAY:
+ return Granularity.Type.DAY;
+ case HOUR:
+ return Granularity.Type.HOUR;
+ case MINUTE:
+ return Granularity.Type.MINUTE;
+ case SECOND:
+ return Granularity.Type.SECOND;
+ default:
+ return null;
+ }
+ }
}
// End DruidDateTimeUtils.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
new file mode 100644
index 0000000..78cfb0c
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidExpressions.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Chars;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+/**
+ * Expression utility class to transform Calcite expressions to Druid expressions when possible.
+ */
+public class DruidExpressions {
+
+ /**
+ * Type mapping between Calcite SQL family types and native Druid expression types
+ */
+ static final Map<SqlTypeName, DruidType> EXPRESSION_TYPES;
+ /**
+ * Druid expression safe chars, must be sorted.
+ */
+ private static final char[] SAFE_CHARS = " ,._-;:(){}[]<>!@#$%^&*`~?/".toCharArray();
+
+ static {
+ final ImmutableMap.Builder<SqlTypeName, DruidType> builder = ImmutableMap.builder();
+
+ for (SqlTypeName type : SqlTypeName.FRACTIONAL_TYPES) {
+ builder.put(type, DruidType.DOUBLE);
+ }
+
+ for (SqlTypeName type : SqlTypeName.INT_TYPES) {
+ builder.put(type, DruidType.LONG);
+ }
+
+ for (SqlTypeName type : SqlTypeName.STRING_TYPES) {
+ builder.put(type, DruidType.STRING);
+ }
+ // Timestamps are treated as longs (millis since the epoch) in Druid expressions.
+ builder.put(SqlTypeName.TIMESTAMP, DruidType.LONG);
+ builder.put(SqlTypeName.DATE, DruidType.LONG);
+ builder.put(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, DruidType.LONG);
+ builder.put(SqlTypeName.OTHER, DruidType.COMPLEX);
+ EXPRESSION_TYPES = builder.build();
+ // Safe chars must be sorted
+ Arrays.sort(SAFE_CHARS);
+ }
+ private DruidExpressions() {
+ }
+
+
+ /**
+ * Translates Calcite rexNode to Druid Expression when possible
+ * @param rexNode rexNode to convert to a Druid Expression
+ * @param inputRowType input row type of the rexNode to translate
+ * @param druidRel Druid query
+ *
+ * @return Druid Expression or null when can not convert the RexNode
+ */
+ @Nullable
+ public static String toDruidExpression(
+ final RexNode rexNode,
+ final RelDataType inputRowType,
+ final DruidQuery druidRel) {
+ SqlKind kind = rexNode.getKind();
+ SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
+
+ if (kind == SqlKind.INPUT_REF) {
+ final RexInputRef ref = (RexInputRef) rexNode;
+ final String columnName = inputRowType.getFieldNames().get(ref.getIndex());
+ if (columnName == null) {
+ return null;
+ }
+ if (druidRel.getDruidTable().timestampFieldName.equals(columnName)) {
+ return DruidExpressions.fromColumn(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ }
+ return DruidExpressions.fromColumn(columnName);
+ }
+
+ if (rexNode instanceof RexCall) {
+ final SqlOperator operator = ((RexCall) rexNode).getOperator();
+ final DruidSqlOperatorConverter conversion = druidRel.getOperatorConversionMap()
+ .get(operator);
+ if (conversion == null) {
+ //unknown operator can not translate
+ return null;
+ } else {
+ return conversion.toDruidExpression(rexNode, inputRowType, druidRel);
+ }
+ }
+ if (kind == SqlKind.LITERAL) {
+ // Translate literal.
+ if (RexLiteral.isNullLiteral(rexNode)) {
+ //case the filter/project might yield to unknown let Calcite deal with this for now
+ return null;
+ } else if (SqlTypeName.NUMERIC_TYPES.contains(sqlTypeName)) {
+ return DruidExpressions.numberLiteral((Number) RexLiteral
+ .value(rexNode));
+ } else if (SqlTypeFamily.INTERVAL_DAY_TIME == sqlTypeName.getFamily()) {
+ // Calcite represents DAY-TIME intervals in milliseconds.
+ final long milliseconds = ((Number) RexLiteral.value(rexNode)).longValue();
+ return DruidExpressions.numberLiteral(milliseconds);
+ } else if (SqlTypeFamily.INTERVAL_YEAR_MONTH == sqlTypeName.getFamily()) {
+ // Calcite represents YEAR-MONTH intervals in months.
+ final long months = ((Number) RexLiteral.value(rexNode)).longValue();
+ return DruidExpressions.numberLiteral(months);
+ } else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) {
+ return
+ DruidExpressions.stringLiteral(RexLiteral.stringValue(rexNode));
+ } else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName
+ || SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE == sqlTypeName) {
+ return DruidExpressions.numberLiteral(DruidDateTimeUtils
+ .literalValue(rexNode, TimeZone.getTimeZone(druidRel.getConnectionConfig().timeZone()))
+ .getMillisSinceEpoch());
+ } else if (SqlTypeName.BOOLEAN == sqlTypeName) {
+ return DruidExpressions.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0);
+ }
+ }
+ // Not Literal/InputRef/RexCall or unknown type?
+ return null;
+ }
+
+ public static String fromColumn(String columnName) {
+ return DruidQuery.format("\"%s\"", columnName);
+ }
+
+ public static String nullLiteral() {
+ return "null";
+ }
+
+ public static String numberLiteral(final Number n) {
+ return n == null ? nullLiteral() : n.toString();
+ }
+
+ public static String stringLiteral(final String s) {
+ return s == null ? nullLiteral() : "'" + escape(s) + "'";
+ }
+
+ private static String escape(final String s) {
+ final StringBuilder escaped = new StringBuilder();
+ for (int i = 0; i < s.length(); i++) {
+ final char c = s.charAt(i);
+ if (Character.isLetterOrDigit(c) || Arrays.binarySearch(SAFE_CHARS, c) >= 0) {
+ escaped.append(c);
+ } else {
+ escaped.append("\\u").append(BaseEncoding.base16().encode(Chars.toByteArray(c)));
+ }
+ }
+ return escaped.toString();
+ }
+
+ public static String functionCall(final String functionName, final List<String> args) {
+ Preconditions.checkNotNull(functionName, "druid functionName");
+ Preconditions.checkNotNull(args, "args");
+
+ final StringBuilder builder = new StringBuilder(functionName);
+ builder.append("(");
+ for (int i = 0; i < args.size(); i++) {
+ final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i);
+ builder.append(arg);
+ if (i < args.size() - 1) {
+ builder.append(",");
+ }
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+
+ public static String nAryOperatorCall(final String druidOperator, final List<String> args) {
+ Preconditions.checkNotNull(druidOperator, "druid operator missing");
+ Preconditions.checkNotNull(args, "args");
+ final StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ for (int i = 0; i < args.size(); i++) {
+ final String arg = Preconditions.checkNotNull(args.get(i), "arg #%s", i);
+ builder.append(arg);
+ if (i < args.size() - 1) {
+ builder.append(druidOperator);
+ }
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+
+ /**
+ * Translate a list of Calcite {@code RexNode} to Druid expressions.
+ *
+ * @param rexNodes list of Calcite expressions meant to be applied on top of the rows
+ *
+ * @return list of Druid expressions in the same order as rexNodes, or null if not possible.
+ * If a non-null list is returned, all elements will be non-null.
+ */
+ @Nullable
+ public static List<String> toDruidExpressions(
+ final DruidQuery druidRel, final RelDataType rowType,
+ final List<RexNode> rexNodes) {
+ final List<String> retVal = new ArrayList<>(rexNodes.size());
+ for (RexNode rexNode : rexNodes) {
+ final String druidExpression = toDruidExpression(rexNode, rowType, druidRel);
+ if (druidExpression == null) {
+ return null;
+ }
+
+ retVal.add(druidExpression);
+ }
+ return retVal;
+ }
+
+ public static String applyTimestampFloor(
+ final String input,
+ final String granularity,
+ final String origin,
+ final TimeZone timeZone) {
+ Preconditions.checkNotNull(input, "input");
+ Preconditions.checkNotNull(granularity, "granularity");
+ return DruidExpressions.functionCall(
+ "timestamp_floor",
+ ImmutableList.of(input,
+ DruidExpressions.stringLiteral(granularity),
+ DruidExpressions.stringLiteral(origin),
+ DruidExpressions.stringLiteral(timeZone.getID())));
+ }
+
+ public static String applyTimestampCeil(
+ final String input,
+ final String granularity,
+ final String origin,
+ final TimeZone timeZone) {
+ Preconditions.checkNotNull(input, "input");
+ Preconditions.checkNotNull(granularity, "granularity");
+ return DruidExpressions.functionCall(
+ "timestamp_ceil",
+ ImmutableList.of(input,
+ DruidExpressions.stringLiteral(granularity),
+ DruidExpressions.stringLiteral(origin),
+ DruidExpressions.stringLiteral(timeZone.getID())));
+ }
+
+
+ public static String applyTimeExtract(String timeExpression, String druidUnit,
+ TimeZone timeZone) {
+ return DruidExpressions.functionCall(
+ "timestamp_extract",
+ ImmutableList.of(
+ timeExpression,
+ DruidExpressions.stringLiteral(druidUnit),
+ DruidExpressions.stringLiteral(timeZone.getID())));
+ }
+}
+
+// End DruidExpressions.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
new file mode 100644
index 0000000..77ccf4f
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJson.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+
+/** Object that knows how to write itself to a
+ * {@link com.fasterxml.jackson.core.JsonGenerator}. */
+public interface DruidJson {
+ void write(JsonGenerator generator) throws IOException;
+}
+
+// End DruidJson.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/98f3704e/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
new file mode 100644
index 0000000..11ec2be
--- /dev/null
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidJsonFilter.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.druid;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimestampString;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Filter element of a Druid "groupBy" or "topN" query.
+ */
+abstract class DruidJsonFilter implements DruidJson {
+
+ /**
+ * @param rexNode rexNode to translate to Druid Json Filter
+ * @param rowType rowType associated to rexNode
+ * @param druidQuery druid query
+ *
+ * @return Druid Json filter or null if it can not translate
+ */
+ @Nullable
+ private static DruidJsonFilter toEqualityKindDruidFilter(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ if (rexNode.getKind() != SqlKind.EQUALS && rexNode.getKind() != SqlKind.NOT_EQUALS) {
+ throw new AssertionError(
+ DruidQuery.format("Expecting EQUALS or NOT_EQUALS but got [%s]", rexNode.getKind()));
+ }
+ final RexCall rexCall = (RexCall) rexNode;
+ if (rexCall.getOperands().size() < 2) {
+ return null;
+ }
+ final RexLiteral rexLiteral;
+ final RexNode refNode;
+ final RexNode lhs = rexCall.getOperands().get(0);
+ final RexNode rhs = rexCall.getOperands().get(1);
+ if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) {
+ rexLiteral = (RexLiteral) lhs;
+ refNode = rhs;
+ } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) {
+ rexLiteral = (RexLiteral) rhs;
+ refNode = lhs;
+ } else {
+ // must have at least one literal
+ return null;
+ }
+
+ if (RexLiteral.isNullLiteral(rexLiteral)) {
+ // we are not handling is NULL filter here thus we bail out if Literal is null
+ return null;
+ }
+ final String literalValue = toDruidLiteral(rexLiteral, rowType, druidQuery);
+ if (literalValue == null) {
+ // can not translate literal better bail out
+ return null;
+ }
+ final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC
+ || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC;
+ final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType,
+ druidQuery);
+ final String columnName = druidColumn.left;
+ final ExtractionFunction extractionFunction = druidColumn.right;
+ if (columnName == null) {
+ // no column name better bail out.
+ return null;
+ }
+ final DruidJsonFilter partialFilter;
+ if (isNumeric) {
+ //need bound filter since it one of operands is numeric
+ partialFilter = new JsonBound(columnName, literalValue, false, literalValue, false, true,
+ extractionFunction);
+ } else {
+ partialFilter = new JsonSelector(columnName, literalValue, extractionFunction);
+ }
+
+ if (rexNode.getKind() == SqlKind.EQUALS) {
+ return partialFilter;
+ }
+ return toNotDruidFilter(partialFilter);
+ }
+
+
+ /**
+ * @param rexNode rexNode to translate
+ * @param rowType row type associated to Filter
+ * @param druidQuery druid query
+ *
+ * @return valid Druid Json Bound Filter or null if it can not translate the rexNode.
+ */
+ @Nullable
+ private static DruidJsonFilter toBoundDruidFilter(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ final RexCall rexCall = (RexCall) rexNode;
+ final RexLiteral rexLiteral;
+ if (rexCall.getOperands().size() < 2) {
+ return null;
+ }
+ final RexNode refNode;
+ final RexNode lhs = rexCall.getOperands().get(0);
+ final RexNode rhs = rexCall.getOperands().get(1);
+ final boolean lhsIsRef;
+ if (lhs.getKind() == SqlKind.LITERAL && rhs.getKind() != SqlKind.LITERAL) {
+ rexLiteral = (RexLiteral) lhs;
+ refNode = rhs;
+ lhsIsRef = false;
+ } else if (rhs.getKind() == SqlKind.LITERAL && lhs.getKind() != SqlKind.LITERAL) {
+ rexLiteral = (RexLiteral) rhs;
+ refNode = lhs;
+ lhsIsRef = true;
+ } else {
+ // must have at least one literal
+ return null;
+ }
+
+ if (RexLiteral.isNullLiteral(rexLiteral)) {
+ // we are not handling is NULL filter here thus we bail out if Literal is null
+ return null;
+ }
+ final String literalValue = DruidJsonFilter.toDruidLiteral(rexLiteral, rowType, druidQuery);
+ if (literalValue == null) {
+ // can not translate literal better bail out
+ return null;
+ }
+ final boolean isNumeric = refNode.getType().getFamily() == SqlTypeFamily.NUMERIC
+ || rexLiteral.getType().getFamily() == SqlTypeFamily.NUMERIC;
+ final Pair<String, ExtractionFunction> druidColumn = DruidQuery.toDruidColumn(refNode, rowType,
+ druidQuery);
+ final String columnName = druidColumn.left;
+ final ExtractionFunction extractionFunction = druidColumn.right;
+ if (columnName == null) {
+ // no column name better bail out.
+ return null;
+ }
+ switch (rexCall.getKind()) {
+ case LESS_THAN_OR_EQUAL:
+ case LESS_THAN:
+ if (lhsIsRef) {
+ return new JsonBound(columnName, null, false, literalValue,
+ rexCall.getKind() == SqlKind.LESS_THAN, isNumeric,
+ extractionFunction);
+ } else {
+ return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.LESS_THAN, null,
+ false, isNumeric,
+ extractionFunction);
+ }
+ case GREATER_THAN_OR_EQUAL:
+ case GREATER_THAN:
+ if (!lhsIsRef) {
+ return new JsonBound(columnName, null, false, literalValue,
+ rexCall.getKind() == SqlKind.GREATER_THAN, isNumeric,
+ extractionFunction);
+ } else {
+ return new JsonBound(columnName, literalValue, rexCall.getKind() == SqlKind.GREATER_THAN,
+ null,
+ false, isNumeric,
+ extractionFunction);
+ }
+ default:
+ return null;
+ }
+
+ }
+
+ /**
+ * @param rexNode rexNode to translate to Druid literal equivalante
+ * @param rowType rowType associated to rexNode
+ * @param druidQuery druid Query
+ *
+ * @return non null string or null if it can not translate to valid Druid equivalent
+ */
+ @Nullable
+ private static String toDruidLiteral(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ final SimpleDateFormat dateFormatter = new SimpleDateFormat(
+ TimeExtractionFunction.ISO_TIME_FORMAT,
+ Locale.ROOT);
+ final String timeZone = druidQuery.getConnectionConfig().timeZone();
+ if (timeZone != null) {
+ dateFormatter.setTimeZone(TimeZone.getTimeZone(timeZone));
+ }
+ final String val;
+ final RexLiteral rhsLiteral = (RexLiteral) rexNode;
+ if (SqlTypeName.NUMERIC_TYPES.contains(rhsLiteral.getTypeName())) {
+ val = String.valueOf(RexLiteral.value(rhsLiteral));
+ } else if (SqlTypeName.CHAR_TYPES.contains(rhsLiteral.getTypeName())) {
+ val = String.valueOf(RexLiteral.stringValue(rhsLiteral));
+ } else if (SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName() || SqlTypeName.DATE == rhsLiteral
+ .getTypeName() || SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE == rhsLiteral.getTypeName()) {
+ TimestampString timestampString = DruidDateTimeUtils
+ .literalValue(rexNode, TimeZone.getTimeZone(timeZone));
+ if (timestampString == null) {
+ throw new AssertionError(
+ "Cannot translate Literal" + rexNode + " of type "
+ + rhsLiteral.getTypeName() + " to TimestampString");
+ }
+ //@TODO this is unnecessary we can send time as Long (eg millis since epoch) to druid
+ val = dateFormatter.format(timestampString.getMillisSinceEpoch());
+ } else {
+ // Don't know how to filter on this kind of literal.
+ val = null;
+ }
+ return val;
+ }
+
+ @Nullable
+ private static DruidJsonFilter toIsNullKindDruidFilter(RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ if (rexNode.getKind() != SqlKind.IS_NULL && rexNode.getKind() != SqlKind.IS_NOT_NULL) {
+ throw new AssertionError(
+ DruidQuery.format("Expecting IS_NULL or IS_NOT_NULL but got [%s]", rexNode.getKind()));
+ }
+ final RexCall rexCall = (RexCall) rexNode;
+ final RexNode refNode = rexCall.getOperands().get(0);
+ Pair<String, ExtractionFunction> druidColumn = DruidQuery
+ .toDruidColumn(refNode, rowType, druidQuery);
+ final String columnName = druidColumn.left;
+ final ExtractionFunction extractionFunction = druidColumn.right;
+ if (columnName == null) {
+ return null;
+ }
+ if (rexNode.getKind() == SqlKind.IS_NOT_NULL) {
+ return toNotDruidFilter(new JsonSelector(columnName, null, extractionFunction));
+ }
+ return new JsonSelector(columnName, null, extractionFunction);
+ }
+
+ @Nullable
+ private static DruidJsonFilter toInKindDruidFilter(RexNode e, RelDataType rowType,
+ DruidQuery druidQuery) {
+ if (e.getKind() != SqlKind.IN && e.getKind() != SqlKind.NOT_IN) {
+ throw new AssertionError(
+ DruidQuery.format("Expecting IN or NOT IN but got [%s]", e.getKind()));
+ }
+ ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+ for (RexNode rexNode : ((RexCall) e).getOperands()) {
+ if (rexNode.getKind() == SqlKind.LITERAL) {
+ String value = toDruidLiteral(rexNode, rowType, druidQuery);
+ if (value == null) {
+ return null;
+ }
+ listBuilder.add(value);
+ }
+ }
+ Pair<String, ExtractionFunction> druidColumn = DruidQuery
+ .toDruidColumn(((RexCall) e).getOperands().get(0),
+ rowType, druidQuery);
+ final String columnName = druidColumn.left;
+ final ExtractionFunction extractionFunction = druidColumn.right;
+ if (columnName == null) {
+ return null;
+ }
+ if (e.getKind() != SqlKind.NOT_IN) {
+ return new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction);
+ } else {
+ return toNotDruidFilter(
+ new DruidJsonFilter.JsonInFilter(columnName, listBuilder.build(), extractionFunction));
+ }
+ }
+
+ @Nullable
+ protected static DruidJsonFilter toNotDruidFilter(DruidJsonFilter druidJsonFilter) {
+ if (druidJsonFilter == null) {
+ return null;
+ }
+ return new JsonCompositeFilter(Type.NOT, druidJsonFilter);
+ }
+
+ @Nullable
+ private static DruidJsonFilter toBetweenDruidFilter(RexNode rexNode, RelDataType rowType,
+ DruidQuery query) {
+ if (rexNode.getKind() != SqlKind.BETWEEN) {
+ return null;
+ }
+ final RexCall rexCall = (RexCall) rexNode;
+ if (rexCall.getOperands().size() < 4) {
+ return null;
+ }
+ // BETWEEN (ASYMMETRIC, REF, 'lower-bound', 'upper-bound')
+ final RexNode refNode = rexCall.getOperands().get(1);
+ final RexNode lhs = rexCall.getOperands().get(2);
+ final RexNode rhs = rexCall.getOperands().get(3);
+
+ final String lhsLiteralValue = toDruidLiteral(lhs, rowType, query);
+ final String rhsLiteralValue = toDruidLiteral(rhs, rowType, query);
+ if (lhsLiteralValue == null || rhsLiteralValue == null) {
+ return null;
+ }
+ final boolean isNumeric = lhs.getType().getFamily() == SqlTypeFamily.NUMERIC
+ || lhs.getType().getFamily() == SqlTypeFamily.NUMERIC;
+ final Pair<String, ExtractionFunction> druidColumn = DruidQuery
+ .toDruidColumn(refNode, rowType, query);
+ final String columnName = druidColumn.left;
+ final ExtractionFunction extractionFunction = druidColumn.right;
+
+ if (columnName == null) {
+ return null;
+ }
+ return new JsonBound(columnName, lhsLiteralValue, false, rhsLiteralValue,
+ false, isNumeric,
+ extractionFunction);
+
+ }
+
+ @Nullable
+ private static DruidJsonFilter toSimpleDruidFilter(RexNode e, RelDataType rowType,
+ DruidQuery druidQuery) {
+ switch (e.getKind()) {
+ case EQUALS:
+ case NOT_EQUALS:
+ return toEqualityKindDruidFilter(e, rowType, druidQuery);
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ return toBoundDruidFilter(e, rowType, druidQuery);
+ case BETWEEN:
+ return toBetweenDruidFilter(e, rowType, druidQuery);
+ case IN:
+ case NOT_IN:
+ return toInKindDruidFilter(e, rowType, druidQuery);
+ case IS_NULL:
+ case IS_NOT_NULL:
+ return toIsNullKindDruidFilter(e, rowType, druidQuery);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * @param rexNode rexNode to translate to Druid Filter
+ * @param rowType rowType of filter input
+ * @param druidQuery Druid query
+ *
+ * @return Druid Json Filters or null when can not translate to valid Druid Filters.
+ */
+ @Nullable
+ static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType,
+ DruidQuery druidQuery) {
+ if (rexNode.isAlwaysTrue()) {
+ return JsonExpressionFilter.alwaysTrue();
+ }
+ if (rexNode.isAlwaysFalse()) {
+ return JsonExpressionFilter.alwaysFalse();
+ }
+ switch (rexNode.getKind()) {
+ case IS_TRUE:
+ case IS_NOT_FALSE:
+ return toDruidFilters(Iterables.getOnlyElement(((RexCall) rexNode).getOperands()), rowType,
+ druidQuery);
+ case IS_NOT_TRUE:
+ case IS_FALSE:
+ final DruidJsonFilter simpleFilter = toDruidFilters(Iterables
+ .getOnlyElement(((RexCall) rexNode).getOperands()), rowType, druidQuery);
+ return simpleFilter != null ? new JsonCompositeFilter(Type.NOT, simpleFilter)
+ : simpleFilter;
+ case AND:
+ case OR:
+ case NOT:
+ final RexCall call = (RexCall) rexNode;
+ final List<DruidJsonFilter> jsonFilters = Lists.newArrayList();
+ for (final RexNode e : call.getOperands()) {
+ final DruidJsonFilter druidFilter = toDruidFilters(e, rowType, druidQuery);
+ if (druidFilter == null) {
+ return null;
+ }
+ jsonFilters.add(druidFilter);
+ }
+ return new JsonCompositeFilter(Type.valueOf(rexNode.getKind().name()),
+ jsonFilters);
+ }
+
+ final DruidJsonFilter simpleLeafFilter = toSimpleDruidFilter(rexNode, rowType, druidQuery);
+ return simpleLeafFilter == null
+ ? toDruidExpressionFilter(rexNode, rowType, druidQuery)
+ : simpleLeafFilter;
+ }
+
+ @Nullable
+ private static DruidJsonFilter toDruidExpressionFilter(RexNode rexNode, RelDataType rowType,
+ DruidQuery query) {
+ final String expression = DruidExpressions.toDruidExpression(rexNode, rowType, query);
+ return expression == null ? null : new JsonExpressionFilter(expression);
+ }
+
+ /**
+ * Supported filter types
+ */
+ protected enum Type {
+ AND,
+ OR,
+ NOT,
+ SELECTOR,
+ IN,
+ BOUND,
+ EXPRESSION;
+
+ public String lowercase() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ protected final Type type;
+
+ private DruidJsonFilter(Type type) {
+ this.type = type;
+ }
+
+ /**
+ * Druid Expression filter.
+ */
+ public static class JsonExpressionFilter extends DruidJsonFilter {
+ private final String expression;
+
+ JsonExpressionFilter(String expression) {
+ super(Type.EXPRESSION);
+ this.expression = Preconditions.checkNotNull(expression);
+ }
+
+ @Override public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type.lowercase());
+ generator.writeStringField("expression", expression);
+ generator.writeEndObject();
+ }
+
+ /**
+ * We need to push to Druid an expression that always evaluates to true.
+ */
+ private static JsonExpressionFilter alwaysTrue() {
+ return new JsonExpressionFilter("1 == 1");
+ }
+
+ /**
+ * We need to push to Druid an expression that always evaluates to false.
+ */
+ private static JsonExpressionFilter alwaysFalse() {
+ return new JsonExpressionFilter("1 == 2");
+ }
+ }
+
+ /**
+ * Equality filter.
+ */
+ private static class JsonSelector extends DruidJsonFilter {
+ private final String dimension;
+
+ private final String value;
+
+ private final ExtractionFunction extractionFunction;
+
+ private JsonSelector(String dimension, String value,
+ ExtractionFunction extractionFunction) {
+ super(Type.SELECTOR);
+ this.dimension = dimension;
+ this.value = value;
+ this.extractionFunction = extractionFunction;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type.lowercase());
+ generator.writeStringField("dimension", dimension);
+ generator.writeStringField("value", value);
+ DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+ generator.writeEndObject();
+ }
+ }
+
+ /**
+ * Bound filter.
+ */
+ @VisibleForTesting
+ protected static class JsonBound extends DruidJsonFilter {
+ private final String dimension;
+
+ private final String lower;
+
+ private final boolean lowerStrict;
+
+ private final String upper;
+
+ private final boolean upperStrict;
+
+ private final boolean alphaNumeric;
+
+ private final ExtractionFunction extractionFunction;
+
+ protected JsonBound(String dimension, String lower,
+ boolean lowerStrict, String upper, boolean upperStrict,
+ boolean alphaNumeric, ExtractionFunction extractionFunction) {
+ super(Type.BOUND);
+ this.dimension = dimension;
+ this.lower = lower;
+ this.lowerStrict = lowerStrict;
+ this.upper = upper;
+ this.upperStrict = upperStrict;
+ this.alphaNumeric = alphaNumeric;
+ this.extractionFunction = extractionFunction;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type.lowercase());
+ generator.writeStringField("dimension", dimension);
+ if (lower != null) {
+ generator.writeStringField("lower", lower);
+ generator.writeBooleanField("lowerStrict", lowerStrict);
+ }
+ if (upper != null) {
+ generator.writeStringField("upper", upper);
+ generator.writeBooleanField("upperStrict", upperStrict);
+ }
+ if (alphaNumeric) {
+ generator.writeStringField("ordering", "numeric");
+ } else {
+ generator.writeStringField("ordering", "lexicographic");
+ }
+ DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+ generator.writeEndObject();
+ }
+ }
+
+ /**
+ * Filter that combines other filters using a boolean operator.
+ */
+ private static class JsonCompositeFilter extends DruidJsonFilter {
+ private final List<? extends DruidJsonFilter> fields;
+
+ private JsonCompositeFilter(Type type,
+ Iterable<? extends DruidJsonFilter> fields) {
+ super(type);
+ this.fields = ImmutableList.copyOf(fields);
+ }
+
+ private JsonCompositeFilter(Type type, DruidJsonFilter... fields) {
+ this(type, ImmutableList.copyOf(fields));
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type.lowercase());
+ switch (type) {
+ case NOT:
+ DruidQuery.writeField(generator, "field", fields.get(0));
+ break;
+ default:
+ DruidQuery.writeField(generator, "fields", fields);
+ }
+ generator.writeEndObject();
+ }
+ }
+
+ /**
+ * IN filter.
+ */
+ protected static class JsonInFilter extends DruidJsonFilter {
+ private final String dimension;
+
+ private final List<String> values;
+
+ private final ExtractionFunction extractionFunction;
+
+ protected JsonInFilter(String dimension, List<String> values,
+ ExtractionFunction extractionFunction) {
+ super(Type.IN);
+ this.dimension = dimension;
+ this.values = values;
+ this.extractionFunction = extractionFunction;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type.lowercase());
+ generator.writeStringField("dimension", dimension);
+ DruidQuery.writeField(generator, "values", values);
+ DruidQuery.writeFieldIf(generator, "extractionFn", extractionFunction);
+ generator.writeEndObject();
+ }
+ }
+
+ public static DruidJsonFilter getSelectorFilter(String column, String value,
+ ExtractionFunction extractionFunction) {
+ Preconditions.checkNotNull(column);
+ return new JsonSelector(column, value, extractionFunction);
+ }
+
+ /**
+ * Druid Having Filter spec
+ */
+ protected static class JsonDimHavingFilter implements DruidJson {
+
+ private final DruidJsonFilter filter;
+
+ public JsonDimHavingFilter(DruidJsonFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", "filter");
+ DruidQuery.writeField(generator, "filter", filter);
+ generator.writeEndObject();
+ }
+ }
+}
+
+// End DruidJsonFilter.java