You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/12 20:24:39 UTC
[07/31] hive git commit: HIVE-14217: Druid integration (Jesus Camacho
Rodriguez, reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
new file mode 100644
index 0000000..43982aa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
@@ -0,0 +1,1053 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * Relational expression representing a scan of a Druid data set.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidQuery extends TableScan {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidQuery.class);
+
+ protected QuerySpec querySpec;
+
+ final DruidTable druidTable;
+ final List<Interval> intervals;
+ final ImmutableList<RelNode> rels;
+
+ private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
+
+ /**
+ * Creates a DruidQuery.
+ *
+ * @param cluster Cluster
+ * @param traitSet Traits
+ * @param table Table
+ * @param druidTable Druid table
+ * @param interval Interval for the query
+ * @param rels Internal relational expressions
+ */
+ private DruidQuery(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable table, DruidTable druidTable,
+ List<Interval> intervals, List<RelNode> rels) {
+ super(cluster, traitSet, table);
+ this.druidTable = druidTable;
+ this.intervals = ImmutableList.copyOf(intervals);
+ this.rels = ImmutableList.copyOf(rels);
+
+ assert isValid(Litmus.THROW);
+ }
+
+ /** Returns a string describing the operations inside this query.
+ *
+ * <p>For example, "sfpal" means {@link TableScan} (s)
+ * followed by {@link Filter} (f)
+ * followed by {@link Project} (p)
+ * followed by {@link Aggregate} (a)
+ * followed by {@link Sort} (l).
+ *
+ * @see #isValidSignature(String)
+ */
+ String signature() {
+ final StringBuilder b = new StringBuilder();
+ for (RelNode rel : rels) {
+ b.append(rel instanceof TableScan ? 's'
+ : rel instanceof Project ? 'p'
+ : rel instanceof Filter ? 'f'
+ : rel instanceof Aggregate ? 'a'
+ : rel instanceof Sort ? 'l'
+ : '!');
+ }
+ return b.toString();
+ }
+
+ @Override public boolean isValid(Litmus litmus) {
+ if (!super.isValid(litmus)) {
+ return false;
+ }
+ final String signature = signature();
+ if (!isValidSignature(signature)) {
+ return litmus.fail("invalid signature");
+ }
+ if (rels.isEmpty()) {
+ return litmus.fail("must have at least one rel");
+ }
+ for (int i = 0; i < rels.size(); i++) {
+ final RelNode r = rels.get(i);
+ if (i == 0) {
+ if (!(r instanceof TableScan)) {
+ return litmus.fail("first rel must be TableScan");
+ }
+ if (r.getTable() != table) {
+ return litmus.fail("first rel must be based on table table");
+ }
+ } else {
+ final List<RelNode> inputs = r.getInputs();
+ if (inputs.size() != 1 || inputs.get(0) != rels.get(i - 1)) {
+ return litmus.fail("each rel must have a single input");
+ }
+ if (r instanceof Aggregate) {
+ final Aggregate aggregate = (Aggregate) r;
+ if (aggregate.getGroupSets().size() != 1
+ || aggregate.indicator) {
+ return litmus.fail("no grouping sets");
+ }
+ for (AggregateCall call : aggregate.getAggCallList()) {
+ if (call.filterArg >= 0) {
+ return litmus.fail("no filtered aggregate functions");
+ }
+ }
+ }
+ if (r instanceof Filter) {
+ final Filter filter = (Filter) r;
+ if (!isValidFilter(filter.getCondition())) {
+ return litmus.fail("invalid filter");
+ }
+ }
+ if (r instanceof Sort) {
+ final Sort sort = (Sort) r;
+ if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
+ return litmus.fail("offset not supported");
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ boolean isValidFilter(RexNode e) {
+ switch (e.getKind()) {
+ case INPUT_REF:
+ case LITERAL:
+ return true;
+ case AND:
+ case OR:
+ case NOT:
+ case EQUALS:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case BETWEEN:
+ case IN:
+ case CAST:
+ return areValidFilters(((RexCall) e).getOperands());
+ default:
+ return false;
+ }
+ }
+
+ private boolean areValidFilters(List<RexNode> es) {
+ for (RexNode e : es) {
+ if (!isValidFilter(e)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** Returns whether a signature represents an sequence of relational operators
+ * that can be translated into a valid Druid query. */
+ static boolean isValidSignature(String signature) {
+ return VALID_SIG.matcher(signature).matches();
+ }
+
+ /** Creates a DruidQuery. */
+ public static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable table, DruidTable druidTable, List<RelNode> rels) {
+ return new DruidQuery(cluster, traitSet, table, druidTable, druidTable.intervals, rels);
+ }
+
+ /** Creates a DruidQuery. */
+ private static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable table, DruidTable druidTable, List<Interval> intervals, List<RelNode> rels) {
+ return new DruidQuery(cluster, traitSet, table, druidTable, intervals, rels);
+ }
+
+ /** Extends a DruidQuery. */
+ public static DruidQuery extendQuery(DruidQuery query, RelNode r) {
+ final ImmutableList.Builder<RelNode> builder = ImmutableList.builder();
+ return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
+ query.druidTable, query.intervals, builder.addAll(query.rels).add(r).build());
+ }
+
+ /** Extends a DruidQuery. */
+ public static DruidQuery extendQuery(DruidQuery query, List<Interval> intervals) {
+ return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
+ query.druidTable, intervals, query.rels);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override public RelDataType deriveRowType() {
+ return getCluster().getTypeFactory().createStructType(
+ Pair.right(Util.last(rels).getRowType().getFieldList()),
+ getQuerySpec().fieldNames);
+ }
+
+ public TableScan getTableScan() {
+ return (TableScan) rels.get(0);
+ }
+
+ public RelNode getTopNode() {
+ return Util.last(rels);
+ }
+
+ @Override public RelOptTable getTable() {
+ return table;
+ }
+
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ for (RelNode rel : rels) {
+ if (rel instanceof TableScan) {
+ TableScan tableScan = (TableScan) rel;
+ pw.item("table", tableScan.getTable().getQualifiedName());
+ pw.item("intervals", intervals);
+ } else if (rel instanceof Filter) {
+ pw.item("filter", ((Filter) rel).getCondition());
+ } else if (rel instanceof Project) {
+ pw.item("projects", ((Project) rel).getProjects());
+ } else if (rel instanceof Aggregate) {
+ final Aggregate aggregate = (Aggregate) rel;
+ pw.item("groups", aggregate.getGroupSet())
+ .item("aggs", aggregate.getAggCallList());
+ } else if (rel instanceof Sort) {
+ final Sort sort = (Sort) rel;
+ for (Ord<RelFieldCollation> ord
+ : Ord.zip(sort.collation.getFieldCollations())) {
+ pw.item("sort" + ord.i, ord.e.getFieldIndex());
+ }
+ for (Ord<RelFieldCollation> ord
+ : Ord.zip(sort.collation.getFieldCollations())) {
+ pw.item("dir" + ord.i, ord.e.shortString());
+ }
+ pw.itemIf("fetch", sort.fetch, sort.fetch != null);
+ } else {
+ throw new AssertionError("rel type not supported in Druid query "
+ + rel);
+ }
+ }
+ return pw;
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ // Heuristic: we assume pushing query to Druid reduces cost by 90%
+ return Util.last(rels).computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ @Override public RelNode project(ImmutableBitSet fieldsUsed,
+ Set<RelDataTypeField> extraFields,
+ RelBuilder relBuilder) {
+ final int fieldCount = getRowType().getFieldCount();
+ if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount))
+ && extraFields.isEmpty()) {
+ return this;
+ }
+ final List<RexNode> exprList = new ArrayList<>();
+ final List<String> nameList = new ArrayList<>();
+ final RexBuilder rexBuilder = getCluster().getRexBuilder();
+ final List<RelDataTypeField> fields = getRowType().getFieldList();
+
+ // Project the subset of fields.
+ for (int i : fieldsUsed) {
+ RelDataTypeField field = fields.get(i);
+ exprList.add(rexBuilder.makeInputRef(this, i));
+ nameList.add(field.getName());
+ }
+
+ // Project nulls for the extra fields. (Maybe a sub-class table has
+ // extra fields, but we don't.)
+ for (RelDataTypeField extraField : extraFields) {
+ exprList.add(
+ rexBuilder.ensureType(
+ extraField.getType(),
+ rexBuilder.constantNull(),
+ true));
+ nameList.add(extraField.getName());
+ }
+
+ HiveProject hp = (HiveProject) relBuilder.push(this).project(exprList, nameList).build();
+ hp.setSynthetic();
+ return hp;
+ }
+
+ public QuerySpec getQuerySpec() {
+ if (querySpec == null) {
+ querySpec = deriveQuerySpec();
+ assert querySpec != null : this;
+ }
+ return querySpec;
+ }
+
+ protected QuerySpec deriveQuerySpec() {
+ final RelDataType rowType = table.getRowType();
+ int i = 1;
+
+ RexNode filter = null;
+ if (i < rels.size() && rels.get(i) instanceof Filter) {
+ final Filter filterRel = (Filter) rels.get(i++);
+ filter = filterRel.getCondition();
+ }
+
+ List<RexNode> projects = null;
+ if (i < rels.size() && rels.get(i) instanceof Project) {
+ final Project project = (Project) rels.get(i++);
+ projects = project.getProjects();
+ }
+
+ ImmutableBitSet groupSet = null;
+ List<AggregateCall> aggCalls = null;
+ List<String> aggNames = null;
+ if (i < rels.size() && rels.get(i) instanceof Aggregate) {
+ final Aggregate aggregate = (Aggregate) rels.get(i++);
+ groupSet = aggregate.getGroupSet();
+ aggCalls = aggregate.getAggCallList();
+ aggNames = Util.skip(aggregate.getRowType().getFieldNames(),
+ groupSet.cardinality());
+ }
+
+ List<Integer> collationIndexes = null;
+ List<Direction> collationDirections = null;
+ Integer fetch = null;
+ if (i < rels.size() && rels.get(i) instanceof Sort) {
+ final Sort sort = (Sort) rels.get(i++);
+ collationIndexes = new ArrayList<>();
+ collationDirections = new ArrayList<>();
+ for (RelFieldCollation fCol: sort.collation.getFieldCollations()) {
+ collationIndexes.add(fCol.getFieldIndex());
+ collationDirections.add(fCol.getDirection());
+ }
+ fetch = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null;
+ }
+
+ if (i != rels.size()) {
+ throw new AssertionError("could not implement all rels");
+ }
+
+ return getQuery(rowType, filter, projects, groupSet, aggCalls, aggNames,
+ collationIndexes, collationDirections, fetch);
+ }
+
+ public String getQueryType() {
+ return getQuerySpec().queryType.getQueryName();
+ }
+
+ public String getQueryString() {
+ return getQuerySpec().queryString;
+ }
+
+ private QuerySpec getQuery(RelDataType rowType, RexNode filter, List<RexNode> projects,
+ ImmutableBitSet groupSet, List<AggregateCall> aggCalls, List<String> aggNames,
+ List<Integer> collationIndexes, List<Direction> collationDirections, Integer fetch) {
+ DruidQueryType queryType = DruidQueryType.SELECT;
+ final Translator translator = new Translator(druidTable, rowType);
+ List<String> fieldNames = rowType.getFieldNames();
+
+ // Handle filter
+ Json jsonFilter = null;
+ if (filter != null) {
+ jsonFilter = translator.translateFilter(filter);
+ }
+
+ // Then we handle project
+ if (projects != null) {
+ translator.metrics.clear();
+ translator.dimensions.clear();
+ final ImmutableList.Builder<String> builder = ImmutableList.builder();
+ for (RexNode project : projects) {
+ builder.add(translator.translate(project, true));
+ }
+ fieldNames = builder.build();
+ }
+
+ // Finally we handle aggregate and sort. Handling of these
+ // operators is more complex, since we need to extract
+ // the conditions to know whether the query will be
+ // executed as a Timeseries, TopN, or GroupBy in Druid
+ final List<String> dimensions = new ArrayList<>();
+ final List<JsonAggregation> aggregations = new ArrayList<>();
+ String granularity = "ALL";
+ Direction timeSeriesDirection = null;
+ JsonLimit limit = null;
+ if (groupSet != null) {
+ assert aggCalls != null;
+ assert aggNames != null;
+ assert aggCalls.size() == aggNames.size();
+
+ int timePositionIdx = -1;
+ final ImmutableList.Builder<String> builder = ImmutableList.builder();
+ if (projects != null) {
+ for (int groupKey : groupSet) {
+ final String s = fieldNames.get(groupKey);
+ final RexNode project = projects.get(groupKey);
+ if (project instanceof RexInputRef) {
+ // Reference, it could be to the timestamp column or any other dimension
+ final RexInputRef ref = (RexInputRef) project;
+ final String origin = druidTable.rowType.getFieldList().get(ref.getIndex()).getName();
+ if (origin.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ granularity = "NONE";
+ builder.add(s);
+ assert timePositionIdx == -1;
+ timePositionIdx = groupKey;
+ } else {
+ dimensions.add(s);
+ builder.add(s);
+ }
+ } else if (project instanceof RexCall) {
+ // Call, check if we should infer granularity
+ RexCall call = (RexCall) project;
+ if (HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
+ granularity = call.getOperator().getName();
+ builder.add(s);
+ assert timePositionIdx == -1;
+ timePositionIdx = groupKey;
+ } else {
+ dimensions.add(s);
+ builder.add(s);
+ }
+ } else {
+ throw new AssertionError("incompatible project expression: " + project);
+ }
+ }
+ } else {
+ for (int groupKey : groupSet) {
+ final String s = fieldNames.get(groupKey);
+ if (s.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ granularity = "NONE";
+ builder.add(s);
+ assert timePositionIdx == -1;
+ timePositionIdx = groupKey;
+ } else {
+ dimensions.add(s);
+ builder.add(s);
+ }
+ }
+ }
+
+ for (Pair<AggregateCall, String> agg : Pair.zip(aggCalls, aggNames)) {
+ final JsonAggregation jsonAggregation =
+ getJsonAggregation(fieldNames, agg.right, agg.left);
+ aggregations.add(jsonAggregation);
+ builder.add(jsonAggregation.name);
+ }
+
+ fieldNames = builder.build();
+
+ ImmutableList<JsonCollation> collations = null;
+ boolean sortsMetric = false;
+ if (collationIndexes != null) {
+ assert collationDirections != null;
+ ImmutableList.Builder<JsonCollation> colBuilder = new ImmutableList.Builder<JsonCollation>();
+ for (Pair<Integer,Direction> p : Pair.zip(collationIndexes, collationDirections)) {
+ colBuilder.add(new JsonCollation(fieldNames.get(p.left),
+ p.right == Direction.DESCENDING ? "descending" : "ascending"));
+ if (p.left >= groupSet.cardinality() && p.right == Direction.DESCENDING) {
+ // Currently only support for DESC in TopN
+ sortsMetric = true;
+ } else if (p.left == timePositionIdx) {
+ assert timeSeriesDirection == null;
+ timeSeriesDirection = p.right;
+ }
+ }
+ collations = colBuilder.build();
+ }
+
+ limit = new JsonLimit("default", fetch, collations);
+
+ if (dimensions.isEmpty() && (collations == null || timeSeriesDirection != null)) {
+ queryType = DruidQueryType.TIMESERIES;
+ assert fetch == null;
+ } else if (dimensions.size() == 1 && sortsMetric && collations.size() == 1 && fetch != null) {
+ queryType = DruidQueryType.TOP_N;
+ } else {
+ queryType = DruidQueryType.GROUP_BY;
+ }
+ } else {
+ assert aggCalls == null;
+ assert aggNames == null;
+ assert collationIndexes == null || collationIndexes.isEmpty();
+ assert collationDirections == null || collationDirections.isEmpty();
+ }
+
+ final StringWriter sw = new StringWriter();
+ final JsonFactory factory = new JsonFactory();
+ try {
+ final JsonGenerator generator = factory.createGenerator(sw);
+
+ switch (queryType) {
+ case TIMESERIES:
+ generator.writeStartObject();
+
+ generator.writeStringField("queryType", "timeseries");
+ generator.writeStringField("dataSource", druidTable.dataSource);
+ generator.writeStringField("descending", timeSeriesDirection != null &&
+ timeSeriesDirection == Direction.DESCENDING ? "true" : "false");
+ generator.writeStringField("granularity", granularity);
+ writeFieldIf(generator, "filter", jsonFilter);
+ writeField(generator, "aggregations", aggregations);
+ writeFieldIf(generator, "postAggregations", null);
+ writeField(generator, "intervals", intervals);
+
+ generator.writeEndObject();
+ break;
+
+ case TOP_N:
+ generator.writeStartObject();
+
+ generator.writeStringField("queryType", "topN");
+ generator.writeStringField("dataSource", druidTable.dataSource);
+ generator.writeStringField("granularity", granularity);
+ generator.writeStringField("dimension", dimensions.get(0));
+ generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0)));
+ writeFieldIf(generator, "filter", jsonFilter);
+ writeField(generator, "aggregations", aggregations);
+ writeFieldIf(generator, "postAggregations", null);
+ writeField(generator, "intervals", intervals);
+ generator.writeNumberField("threshold", fetch);
+
+ generator.writeEndObject();
+ break;
+
+ case GROUP_BY:
+ generator.writeStartObject();
+
+ if (aggregations.isEmpty()) {
+ // Druid requires at least one aggregation, otherwise gives:
+ // Must have at least one AggregatorFactory
+ aggregations.add(
+ new JsonAggregation("longSum", "dummy_agg", "dummy_agg"));
+ }
+
+ generator.writeStringField("queryType", "groupBy");
+ generator.writeStringField("dataSource", druidTable.dataSource);
+ generator.writeStringField("granularity", granularity);
+ writeField(generator, "dimensions", dimensions);
+ writeFieldIf(generator, "limitSpec", limit);
+ writeFieldIf(generator, "filter", jsonFilter);
+ writeField(generator, "aggregations", aggregations);
+ writeFieldIf(generator, "postAggregations", null);
+ writeField(generator, "intervals", intervals);
+ writeFieldIf(generator, "having", null);
+
+ generator.writeEndObject();
+ break;
+
+ case SELECT:
+ generator.writeStartObject();
+
+ generator.writeStringField("queryType", "select");
+ generator.writeStringField("dataSource", druidTable.dataSource);
+ generator.writeStringField("descending", "false");
+ writeField(generator, "intervals", intervals);
+ writeFieldIf(generator, "filter", jsonFilter);
+ writeField(generator, "dimensions", translator.dimensions);
+ writeField(generator, "metrics", translator.metrics);
+ generator.writeStringField("granularity", granularity);
+
+ generator.writeFieldName("pagingSpec");
+ generator.writeStartObject();
+ generator.writeNumberField("threshold", fetch != null ? fetch : 1);
+ generator.writeEndObject();
+
+ generator.writeFieldName("context");
+ generator.writeStartObject();
+ generator.writeBooleanField(Constants.DRUID_QUERY_FETCH, fetch != null);
+ generator.writeEndObject();
+
+ generator.writeEndObject();
+ break;
+
+ default:
+ throw new AssertionError("unknown query type " + queryType);
+ }
+
+ generator.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return new QuerySpec(queryType, sw.toString(), fieldNames);
+ }
+
+ private JsonAggregation getJsonAggregation(List<String> fieldNames,
+ String name, AggregateCall aggCall) {
+ final List<String> list = new ArrayList<>();
+ for (Integer arg : aggCall.getArgList()) {
+ list.add(fieldNames.get(arg));
+ }
+ final String only = Iterables.getFirst(list, null);
+ final boolean b = aggCall.getType().getSqlTypeName() == SqlTypeName.DOUBLE;
+ switch (aggCall.getAggregation().getKind()) {
+ case COUNT:
+ if (aggCall.isDistinct()) {
+ return new JsonCardinalityAggregation("cardinality", name, list);
+ }
+ return new JsonAggregation("count", name, only);
+ case SUM:
+ case SUM0:
+ return new JsonAggregation(b ? "doubleSum" : "longSum", name, only);
+ case MIN:
+ return new JsonAggregation(b ? "doubleMin" : "longMin", name, only);
+ case MAX:
+ return new JsonAggregation(b ? "doubleMax" : "longMax", name, only);
+ default:
+ throw new AssertionError("unknown aggregate " + aggCall);
+ }
+ }
+
+ private static void writeField(JsonGenerator generator, String fieldName,
+ Object o) throws IOException {
+ generator.writeFieldName(fieldName);
+ writeObject(generator, o);
+ }
+
+ private static void writeFieldIf(JsonGenerator generator, String fieldName,
+ Object o) throws IOException {
+ if (o != null) {
+ writeField(generator, fieldName, o);
+ }
+ }
+
+ private static void writeArray(JsonGenerator generator, List<?> elements)
+ throws IOException {
+ generator.writeStartArray();
+ for (Object o : elements) {
+ writeObject(generator, o);
+ }
+ generator.writeEndArray();
+ }
+
+ private static void writeObject(JsonGenerator generator, Object o)
+ throws IOException {
+ if (o instanceof String) {
+ String s = (String) o;
+ generator.writeString(s);
+ } else if (o instanceof Interval) {
+ Interval i = (Interval) o;
+ generator.writeString(i.toString());
+ } else if (o instanceof Integer) {
+ Integer i = (Integer) o;
+ generator.writeNumber(i);
+ } else if (o instanceof List) {
+ writeArray(generator, (List<?>) o);
+ } else if (o instanceof Json) {
+ ((Json) o).write(generator);
+ } else {
+ throw new AssertionError("not a json object: " + o);
+ }
+ }
+
+ /** Druid query specification. */
+ public static class QuerySpec {
+ final DruidQueryType queryType;
+ final String queryString;
+ final List<String> fieldNames;
+
+ QuerySpec(DruidQueryType queryType, String queryString,
+ List<String> fieldNames) {
+ this.queryType = Preconditions.checkNotNull(queryType);
+ this.queryString = Preconditions.checkNotNull(queryString);
+ this.fieldNames = ImmutableList.copyOf(fieldNames);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(queryType, queryString, fieldNames);
+ }
+
+ @Override public boolean equals(Object obj) {
+ return obj == this
+ || obj instanceof QuerySpec
+ && queryType == ((QuerySpec) obj).queryType
+ && queryString.equals(((QuerySpec) obj).queryString)
+ && fieldNames.equals(((QuerySpec) obj).fieldNames);
+ }
+
+ @Override public String toString() {
+ return "{queryType: " + queryType
+ + ", queryString: " + queryString
+ + ", fieldNames: " + fieldNames + "}";
+ }
+
+ String getQueryString(String pagingIdentifier, int offset) {
+ if (pagingIdentifier == null) {
+ return queryString;
+ }
+ return queryString.replace("\"threshold\":",
+ "\"pagingIdentifiers\":{\"" + pagingIdentifier + "\":" + offset
+ + "},\"threshold\":");
+ }
+ }
+
+ /** Translates scalar expressions to Druid field references. */
+ private static class Translator {
+ final List<String> dimensions = new ArrayList<>();
+ final List<String> metrics = new ArrayList<>();
+ final DruidTable druidTable;
+ final RelDataType rowType;
+
+ Translator(DruidTable druidTable, RelDataType rowType) {
+ this.druidTable = druidTable;
+ this.rowType = rowType;
+ for (RelDataTypeField f : rowType.getFieldList()) {
+ final String fieldName = f.getName();
+ if (druidTable.metricFieldNames.contains(fieldName)) {
+ metrics.add(fieldName);
+ } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
+ dimensions.add(fieldName);
+ }
+ }
+ }
+
+ String translate(RexNode e, boolean set) {
+ switch (e.getKind()) {
+ case INPUT_REF:
+ final RexInputRef ref = (RexInputRef) e;
+ final String fieldName =
+ rowType.getFieldList().get(ref.getIndex()).getName();
+ if (set) {
+ if (druidTable.metricFieldNames.contains(fieldName)) {
+ metrics.add(fieldName);
+ } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
+ dimensions.add(fieldName);
+ }
+ }
+ return fieldName;
+
+ case CAST:
+ return tr(e, 0, set);
+
+ case LITERAL:
+ return ((RexLiteral) e).getValue2().toString();
+
+ case OTHER_FUNCTION:
+ final RexCall call = (RexCall) e;
+ assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
+ return tr(call, 0, set);
+
+ default:
+ throw new AssertionError("invalid expression " + e);
+ }
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ private JsonFilter translateFilter(RexNode e) {
+ RexCall call;
+ switch (e.getKind()) {
+ case EQUALS:
+ case NOT_EQUALS:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ call = (RexCall) e;
+ int posRef;
+ int posConstant;
+ if (RexUtil.isConstant(call.getOperands().get(1))) {
+ posRef = 0;
+ posConstant = 1;
+ } else if (RexUtil.isConstant(call.getOperands().get(0))) {
+ posRef = 1;
+ posConstant = 0;
+ } else {
+ throw new AssertionError("it is not a valid comparison: " + e);
+ }
+ switch (e.getKind()) {
+ case EQUALS:
+ return new JsonSelector("selector", tr(e, posRef), tr(e, posConstant));
+ case NOT_EQUALS:
+ return new JsonCompositeFilter("not",
+ ImmutableList.of(new JsonSelector("selector", tr(e, posRef), tr(e, posConstant))));
+ case GREATER_THAN:
+ return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), true, null, false,
+ false);
+ case GREATER_THAN_OR_EQUAL:
+ return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), false, null, false,
+ false);
+ case LESS_THAN:
+ return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), true,
+ false);
+ case LESS_THAN_OR_EQUAL:
+ return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), false,
+ false);
+ }
+ case AND:
+ case OR:
+ case NOT:
+ call = (RexCall) e;
+ return new JsonCompositeFilter(e.getKind().toString().toLowerCase(),
+ translateFilters(call.getOperands()));
+ default:
+ throw new AssertionError("cannot translate filter: " + e);
+ }
+ }
+
+ private String tr(RexNode call, int index) {
+ return tr(call, index, false);
+ }
+
+ private String tr(RexNode call, int index, boolean set) {
+ return translate(((RexCall) call).getOperands().get(index), set);
+ }
+
+ private List<JsonFilter> translateFilters(List<RexNode> operands) {
+ final ImmutableList.Builder<JsonFilter> builder =
+ ImmutableList.builder();
+ for (RexNode operand : operands) {
+ builder.add(translateFilter(operand));
+ }
+ return builder.build();
+ }
+ }
+
+ /** Object that knows how to write itself to a
+ * {@link com.fasterxml.jackson.core.JsonGenerator}. */
+ private interface Json {
+ void write(JsonGenerator generator) throws IOException;
+ }
+
+ /** Aggregation element of a Druid "groupBy" or "topN" query. */
+ private static class JsonAggregation implements Json {
+ final String type;
+ final String name;
+ final String fieldName;
+
+ private JsonAggregation(String type, String name, String fieldName) {
+ this.type = type;
+ this.name = name;
+ this.fieldName = fieldName;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ generator.writeStringField("name", name);
+ writeFieldIf(generator, "fieldName", fieldName);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Collation element of a Druid "groupBy" query. */
+ private static class JsonLimit implements Json {
+ final String type;
+ final Integer limit;
+ final ImmutableList<JsonCollation> collations;
+
+ private JsonLimit(String type, Integer limit, ImmutableList<JsonCollation> collations) {
+ this.type = type;
+ this.limit = limit;
+ this.collations = collations;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ writeFieldIf(generator, "limit", limit);
+ writeFieldIf(generator, "columns", collations);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Collation element of a Druid "groupBy" query. */
+ private static class JsonCollation implements Json {
+ final String dimension;
+ final String direction;
+
+ private JsonCollation(String dimension, String direction) {
+ this.dimension = dimension;
+ this.direction = direction;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("dimension", dimension);
+ writeFieldIf(generator, "direction", direction);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Aggregation element that calls the "cardinality" function. */
+ private static class JsonCardinalityAggregation extends JsonAggregation {
+ final List<String> fieldNames;
+
+ private JsonCardinalityAggregation(String type, String name,
+ List<String> fieldNames) {
+ super(type, name, null);
+ this.fieldNames = fieldNames;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ generator.writeStringField("name", name);
+ writeFieldIf(generator, "fieldNames", fieldNames);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Filter element of a Druid "groupBy" or "topN" query. */
+ private abstract static class JsonFilter implements Json {
+ final String type;
+
+ private JsonFilter(String type) {
+ this.type = type;
+ }
+ }
+
+ /** Equality filter. */
+ private static class JsonSelector extends JsonFilter {
+ private final String dimension;
+ private final String value;
+
+ private JsonSelector(String type, String dimension, String value) {
+ super(type);
+ this.dimension = dimension;
+ this.value = value;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ generator.writeStringField("dimension", dimension);
+ generator.writeStringField("value", value);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Bound filter. */
+ private static class JsonBound extends JsonFilter {
+ private final String dimension;
+ private final String lower;
+ private final boolean lowerStrict;
+ private final String upper;
+ private final boolean upperStrict;
+ private final boolean alphaNumeric;
+
+ private JsonBound(String type, String dimension, String lower,
+ boolean lowerStrict, String upper, boolean upperStrict,
+ boolean alphaNumeric) {
+ super(type);
+ this.dimension = dimension;
+ this.lower = lower;
+ this.lowerStrict = lowerStrict;
+ this.upper = upper;
+ this.upperStrict = upperStrict;
+ this.alphaNumeric = alphaNumeric;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ generator.writeStringField("dimension", dimension);
+ if (lower != null) {
+ generator.writeStringField("lower", lower);
+ generator.writeBooleanField("lowerStrict", lowerStrict);
+ }
+ if (upper != null) {
+ generator.writeStringField("upper", upper);
+ generator.writeBooleanField("upperStrict", upperStrict);
+ }
+ generator.writeBooleanField("alphaNumeric", alphaNumeric);
+ generator.writeEndObject();
+ }
+ }
+
+ /** Filter that combines other filters using a boolean operator. */
+ private static class JsonCompositeFilter extends JsonFilter {
+ private final List<? extends JsonFilter> fields;
+
+ private JsonCompositeFilter(String type,
+ List<? extends JsonFilter> fields) {
+ super(type);
+ this.fields = fields;
+ }
+
+ public void write(JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeStringField("type", type);
+ switch (type) {
+ case "NOT":
+ writeField(generator, "field", fields.get(0));
+ break;
+ default:
+ writeField(generator, "fields", fields);
+ }
+ generator.writeEndObject();
+ }
+ }
+
+}
+
+// End DruidQuery.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
new file mode 100644
index 0000000..228b307
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+/**
+ * Type of Druid query.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public enum DruidQueryType {
+ SELECT("select"),
+ TOP_N("topN"),
+ GROUP_BY("groupBy"),
+ TIMESERIES("timeseries");
+
+ private final String queryName;
+
+ private DruidQueryType(String queryName) {
+ this.queryName = queryName;
+ }
+
+ public String getQueryName() {
+ return this.queryName;
+ }
+}
+
+// End QueryType.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
new file mode 100644
index 0000000..f68ffa5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortProjectTransposeRule;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Rules and relational operators for {@link DruidQuery}.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidRules {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidRules.class);
+
+ // Avoid instantiation
+ private DruidRules() {
+ }
+
+ public static final DruidFilterRule FILTER = new DruidFilterRule();
+ public static final DruidProjectRule PROJECT = new DruidProjectRule();
+ public static final DruidAggregateRule AGGREGATE = new DruidAggregateRule();
+ public static final DruidProjectAggregateRule PROJECT_AGGREGATE = new DruidProjectAggregateRule();
+ public static final DruidSortRule SORT = new DruidSortRule();
+ public static final DruidProjectSortRule PROJECT_SORT = new DruidProjectSortRule();
+ public static final DruidSortProjectRule SORT_PROJECT = new DruidSortProjectRule();
+
+ /** Predicate that returns whether Druid can not handle an aggregate. */
+ private static final Predicate<AggregateCall> BAD_AGG = new Predicate<AggregateCall>() {
+ public boolean apply(AggregateCall aggregateCall) {
+ switch (aggregateCall.getAggregation().getKind()) {
+ case COUNT:
+ case SUM:
+ case SUM0:
+ case MIN:
+ case MAX:
+ return false;
+ default:
+ return true;
+ }
+ }
+ };
+
+ /**
+ * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}.
+ */
+ private static class DruidFilterRule extends RelOptRule {
+ private DruidFilterRule() {
+ super(operand(Filter.class,
+ operand(DruidQuery.class, none())));
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ final Filter filter = call.rel(0);
+ final DruidQuery query = call.rel(1);
+ if (!DruidQuery.isValidSignature(query.signature() + 'f')
+ || !query.isValidFilter(filter.getCondition())) {
+ return;
+ }
+ // Timestamp
+ int timestampFieldIdx = -1;
+ for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
+ if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(
+ query.getRowType().getFieldList().get(i).getName())) {
+ timestampFieldIdx = i;
+ break;
+ }
+ }
+ final Pair<List<RexNode>, List<RexNode>> pair = splitFilters(
+ filter.getCluster().getRexBuilder(), query, filter.getCondition(), timestampFieldIdx);
+ if (pair == null) {
+ // We can't push anything useful to Druid.
+ return;
+ }
+ List<Interval> intervals = null;
+ if (!pair.left.isEmpty()) {
+ intervals = DruidIntervalUtils.createInterval(
+ query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
+ pair.left);
+ if (intervals == null) {
+ // We can't push anything useful to Druid.
+ return;
+ }
+ }
+ DruidQuery newDruidQuery = query;
+ if (!pair.right.isEmpty()) {
+ if (!validConditions(pair.right)) {
+ return;
+ }
+ final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
+ RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), pair.right, false));
+ newDruidQuery = DruidQuery.extendQuery(query, newFilter);
+ }
+ if (intervals != null) {
+ newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals);
+ }
+ call.transformTo(newDruidQuery);
+ }
+
+ /* Splits the filter condition in two groups: those that filter on the timestamp column
+ * and those that filter on other fields */
+ private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder,
+ final DruidQuery input, RexNode cond, final int timestampFieldIdx) {
+ final List<RexNode> timeRangeNodes = new ArrayList<>();
+ final List<RexNode> otherNodes = new ArrayList<>();
+ List<RexNode> conjs = RelOptUtil.conjunctions(cond);
+ if (conjs.isEmpty()) {
+ // We do not transform
+ return null;
+ }
+ // Number of columns with the dimensions and timestamp
+ int max = input.getRowType().getFieldCount() - input.druidTable.metricFieldNames.size();
+ for (RexNode conj : conjs) {
+ final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
+ conj.accept(visitor);
+ if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
+ if (visitor.inputPosReferenced.size() != 1) {
+ // Complex predicate, transformation currently not supported
+ return null;
+ }
+ timeRangeNodes.add(conj);
+ } else if (!visitor.inputPosReferenced.tailSet(max).isEmpty()) {
+ // Filter on metrics, not supported in Druid
+ return null;
+ } else {
+ otherNodes.add(conj);
+ }
+ }
+ return Pair.of(timeRangeNodes, otherNodes);
+ }
+
+ /* Checks that all conditions are on ref + literal*/
+ private static boolean validConditions(List<RexNode> nodes) {
+ for (RexNode node: nodes) {
+ try {
+ node.accept(
+ new RexVisitorImpl<Void>(true) {
+ @SuppressWarnings("incomplete-switch")
+ @Override public Void visitCall(RexCall call) {
+ switch (call.getKind()) {
+ case CAST:
+ // Only if on top of ref or literal
+ if (call.getOperands().get(0) instanceof RexInputRef ||
+ call.getOperands().get(0) instanceof RexLiteral) {
+ break;
+ }
+ // Not supported
+ throw Util.FoundOne.NULL;
+ case EQUALS:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ // Check cast
+ RexNode left = call.getOperands().get(0);
+ if (left.getKind() == SqlKind.CAST) {
+ left = ((RexCall)left).getOperands().get(0);
+ }
+ RexNode right = call.getOperands().get(1);
+ if (right.getKind() == SqlKind.CAST) {
+ right = ((RexCall)right).getOperands().get(0);
+ }
+ if (left instanceof RexInputRef && right instanceof RexLiteral) {
+ break;
+ }
+ if (right instanceof RexInputRef && left instanceof RexLiteral) {
+ break;
+ }
+ // Not supported if it is not ref + literal
+ throw Util.FoundOne.NULL;
+ case BETWEEN:
+ case IN:
+ // Not supported here yet
+ throw Util.FoundOne.NULL;
+ }
+ return super.visitCall(call);
+ }
+ });
+ } catch (Util.FoundOne e) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
+ */
+ private static class DruidProjectRule extends RelOptRule {
+ private DruidProjectRule() {
+ super(operand(Project.class,
+ operand(DruidQuery.class, none())));
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ final Project project = call.rel(0);
+ final DruidQuery query = call.rel(1);
+ if (!DruidQuery.isValidSignature(query.signature() + 'p')) {
+ return;
+ }
+
+ if (canProjectAll(project.getProjects())) {
+ // All expressions can be pushed to Druid in their entirety.
+ final RelNode newProject = project.copy(project.getTraitSet(),
+ ImmutableList.of(Util.last(query.rels)));
+ RelNode newNode = DruidQuery.extendQuery(query, newProject);
+ call.transformTo(newNode);
+ return;
+ }
+ final Pair<List<RexNode>, List<RexNode>> pair = splitProjects(
+ project.getCluster().getRexBuilder(), query, project.getProjects());
+ if (pair == null) {
+ // We can't push anything useful to Druid.
+ return;
+ }
+ final List<RexNode> above = pair.left;
+ final List<RexNode> below = pair.right;
+ final RelDataTypeFactory.FieldInfoBuilder builder = project.getCluster().getTypeFactory()
+ .builder();
+ final RelNode input = Util.last(query.rels);
+ for (RexNode e : below) {
+ final String name;
+ if (e instanceof RexInputRef) {
+ name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex());
+ } else {
+ name = null;
+ }
+ builder.add(name, e.getType());
+ }
+ final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build());
+ final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
+ final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above,
+ project.getRowType());
+ call.transformTo(newProject2);
+ }
+
+ private static boolean canProjectAll(List<RexNode> nodes) {
+ for (RexNode e : nodes) {
+ if (!(e instanceof RexInputRef)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder,
+ final RelNode input, List<RexNode> nodes) {
+ final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
+ for (RexNode node : nodes) {
+ node.accept(visitor);
+ }
+ if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) {
+ // All inputs are referenced
+ return null;
+ }
+ final List<RexNode> belowNodes = new ArrayList<>();
+ final List<RelDataType> belowTypes = new ArrayList<>();
+ final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced);
+ for (int i : positions) {
+ final RexNode node = rexBuilder.makeInputRef(input, i);
+ belowNodes.add(node);
+ belowTypes.add(node.getType());
+ }
+ final List<RexNode> aboveNodes = new ArrayList<>();
+ for (RexNode node : nodes) {
+ aboveNodes.add(node.accept(new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef ref) {
+ final int index = positions.indexOf(ref.getIndex());
+ return rexBuilder.makeInputRef(belowTypes.get(index), index);
+ }
+ }));
+ }
+ return Pair.of(aboveNodes, belowNodes);
+ }
+ }
+
+ /**
+ * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
+ */
+ private static class DruidAggregateRule extends RelOptRule {
+ private DruidAggregateRule() {
+ super(operand(Aggregate.class,
+ operand(DruidQuery.class, none())));
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final DruidQuery query = call.rel(1);
+ if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
+ return;
+ }
+ if (aggregate.indicator
+ || aggregate.getGroupSets().size() != 1
+ || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
+ || !validAggregate(aggregate, query)) {
+ return;
+ }
+ final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
+ ImmutableList.of(Util.last(query.rels)));
+ call.transformTo(DruidQuery.extendQuery(query, newAggregate));
+ }
+
+ /* Check whether agg functions reference timestamp */
+ private static boolean validAggregate(Aggregate aggregate, DruidQuery query) {
+ ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ builder.addAll(aggCall.getArgList());
+ }
+ return !checkTimestampRefOnQuery(builder.build(), query.getTopNode());
+ }
+ }
+
+ /**
+ * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and
+ * {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
+ */
+ private static class DruidProjectAggregateRule extends RelOptRule {
+ private DruidProjectAggregateRule() {
+ super(operand(Aggregate.class,
+ operand(Project.class,
+ operand(DruidQuery.class, none()))));
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ final DruidQuery query = call.rel(2);
+ if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
+ return;
+ }
+ int timestampIdx;
+ if ((timestampIdx = validProject(project, query)) == -1) {
+ return;
+ }
+ if (aggregate.indicator
+ || aggregate.getGroupSets().size() != 1
+ || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
+ || !validAggregate(aggregate, timestampIdx)) {
+ return;
+ }
+
+ final RelNode newProject = project.copy(project.getTraitSet(),
+ ImmutableList.of(Util.last(query.rels)));
+ final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject);
+ final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
+ ImmutableList.of(Util.last(projectDruidQuery.rels)));
+ call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate));
+ }
+
+ /* To be a valid Project, we allow it to contain references, and a single call
+ * to an EXTRACT function on the timestamp column. Returns the reference to
+ * the timestamp, if any. */
+ private static int validProject(Project project, DruidQuery query) {
+ List<RexNode> nodes = project.getProjects();
+ int idxTimestamp = -1;
+ for (int i = 0; i < nodes.size(); i++) {
+ final RexNode e = nodes.get(i);
+ if (e instanceof RexCall) {
+ // It is a call, check that it is EXTRACT and follow-up conditions
+ final RexCall call = (RexCall) e;
+ if (!HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
+ return -1;
+ }
+ if (idxTimestamp != -1) {
+ // Already one usage of timestamp column
+ return -1;
+ }
+ if (!(call.getOperands().get(0) instanceof RexInputRef)) {
+ return -1;
+ }
+ final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
+ if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode()))) {
+ return -1;
+ }
+ idxTimestamp = i;
+ continue;
+ }
+ if (!(e instanceof RexInputRef)) {
+ // It needs to be a reference
+ return -1;
+ }
+ final RexInputRef ref = (RexInputRef) e;
+ if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode())) {
+ if (idxTimestamp != -1) {
+ // Already one usage of timestamp column
+ return -1;
+ }
+ idxTimestamp = i;
+ }
+ }
+ return idxTimestamp;
+ }
+
+ private static boolean validAggregate(Aggregate aggregate, int idx) {
+ if (!aggregate.getGroupSet().get(idx)) {
+ return false;
+ }
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ if (aggCall.getArgList().contains(idx)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Rule to push an {@link org.apache.calcite.rel.core.Sort} through a
+ * {@link org.apache.calcite.rel.core.Project}. Useful to transform
+ * to complex Druid queries.
+ */
+ private static class DruidProjectSortRule extends HiveSortProjectTransposeRule {
+ private DruidProjectSortRule() {
+ super(operand(Sort.class,
+ operand(Project.class,
+ operand(DruidQuery.class, none()))));
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ return true;
+ }
+
+ }
+
+ /**
+ * Rule to push back {@link org.apache.calcite.rel.core.Project} through a
+ * {@link org.apache.calcite.rel.core.Sort}. Useful if after pushing Sort,
+ * we could not push it inside DruidQuery.
+ */
+ private static class DruidSortProjectRule extends HiveProjectSortTransposeRule {
+ private DruidSortProjectRule() {
+ super(operand(Project.class,
+ operand(Sort.class,
+ operand(DruidQuery.class, none()))));
+ }
+ }
+
+ /**
+ * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
+ */
+ private static class DruidSortRule extends RelOptRule {
+ private DruidSortRule() {
+ super(operand(Sort.class,
+ operand(DruidQuery.class, none())));
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final DruidQuery query = call.rel(1);
+ if (!DruidQuery.isValidSignature(query.signature() + 'l')) {
+ return;
+ }
+ // Either it is:
+ // - a sort without limit on the time column on top of
+ // Agg operator (transformable to timeseries query), or
+ // - it is a sort w/o limit on columns that do not include
+ // the time column on top of Agg operator, or
+ // - a simple limit on top of other operator than Agg
+ if (!validSortLimit(sort, query)) {
+ return;
+ }
+ final RelNode newSort = sort.copy(sort.getTraitSet(),
+ ImmutableList.of(Util.last(query.rels)));
+ call.transformTo(DruidQuery.extendQuery(query, newSort));
+ }
+
+ /* Check sort valid */
+ private static boolean validSortLimit(Sort sort, DruidQuery query) {
+ if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
+ // offset not supported by Druid
+ return false;
+ }
+ if (query.getTopNode() instanceof Aggregate) {
+ final Aggregate topAgg = (Aggregate) query.getTopNode();
+ final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder();
+ int metricsRefs = 0;
+ for (RelFieldCollation col : sort.collation.getFieldCollations()) {
+ int idx = col.getFieldIndex();
+ if (idx >= topAgg.getGroupCount()) {
+ metricsRefs++;
+ continue;
+ }
+ positionsReferenced.set(topAgg.getGroupSet().nth(idx));
+ }
+ boolean refsTimestamp =
+ checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput());
+ if (refsTimestamp && metricsRefs != 0) {
+ return false;
+ }
+ return true;
+ }
+ // If it is going to be a Druid select operator, we push the limit iff
+ // 1) it does not contain a sort specification (required by Druid) and
+ // 2) limit is smaller than select threshold, as otherwise it might be
+ // better to obtain some parallelization and let global limit
+ // optimizer kick in
+ HiveDruidConf conf = sort.getCluster().getPlanner()
+ .getContext().unwrap(HiveDruidConf.class);
+ return HiveCalciteUtil.pureLimitRelNode(sort) &&
+ RexLiteral.intValue(sort.fetch) <= conf.getSelectThreshold();
+ }
+ }
+
+ /* Check if any of the references leads to the timestamp column */
+ private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top) {
+ if (top instanceof Project) {
+ ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
+ final Project project = (Project) top;
+ for (int index : set) {
+ RexNode node = project.getProjects().get(index);
+ if (node instanceof RexInputRef) {
+ newSet.set(((RexInputRef)node).getIndex());
+ } else if (node instanceof RexCall) {
+ RexCall call = (RexCall) node;
+ assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
+ newSet.set(((RexInputRef)call.getOperands().get(0)).getIndex());
+ }
+ }
+ top = project.getInput();
+ set = newSet.build();
+ }
+
+ // Check if any references the timestamp column
+ for (int index : set) {
+ if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(top.getRowType().getFieldNames().get(index))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+}
+
+// End DruidRules.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
new file mode 100644
index 0000000..3b3f68a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.Map;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Schema mapped onto a Druid instance.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidSchema extends AbstractSchema {
+ final String url;
+
+ /**
+ * Creates a Druid schema.
+ *
+ * @param url URL of query REST service
+ */
+ public DruidSchema(String url) {
+ this.url = Preconditions.checkNotNull(url);
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ return builder.build();
+ }
+}
+
+// End DruidSchema.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
new file mode 100644
index 0000000..7288291
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.interpreter.BindableConvention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Table mapped onto a Druid table.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidTable extends AbstractTable implements TranslatableTable {
+
+ public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
+ public static final Interval DEFAULT_INTERVAL = new Interval(
+ new DateTime("1900-01-01"),
+ new DateTime("3000-01-01")
+ );
+
+ final DruidSchema schema;
+ final String dataSource;
+ final RelDataType rowType;
+ final RelProtoDataType protoRowType;
+ final ImmutableSet<String> metricFieldNames;
+ final ImmutableList<Interval> intervals;
+ final String timestampFieldName;
+
+ /**
+ * Creates a Druid table.
+ *
+ * @param schema Druid schema that contains this table
+ * @param dataSource Druid data source name
+ * @param protoRowType Field names and types
+ * @param metricFieldNames Names of fields that are metrics
+ * @param interval Default interval if query does not constrain the time
+ * @param timestampFieldName Name of the column that contains the time
+ */
+ public DruidTable(DruidSchema schema, String dataSource,
+ RelProtoDataType protoRowType, Set<String> metricFieldNames,
+ List<Interval> intervals, String timestampFieldName) {
+ this.schema = Preconditions.checkNotNull(schema);
+ this.dataSource = Preconditions.checkNotNull(dataSource);
+ this.rowType = null;
+ this.protoRowType = protoRowType;
+ this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
+ this.intervals = ImmutableList.copyOf(intervals);
+ this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
+ }
+
+ public DruidTable(DruidSchema schema, String dataSource,
+ RelDataType rowType, Set<String> metricFieldNames,
+ List<Interval> intervals, String timestampFieldName) {
+ this.schema = Preconditions.checkNotNull(schema);
+ this.dataSource = Preconditions.checkNotNull(dataSource);
+ this.rowType = Preconditions.checkNotNull(rowType);
+ this.protoRowType = null;
+ this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
+ this.intervals = ImmutableList.copyOf(intervals);
+ this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ final RelDataType thisRowType;
+ if (rowType != null) {
+ thisRowType = rowType;
+ } else {
+ // Generate
+ thisRowType = protoRowType.apply(typeFactory);
+ }
+ final List<String> fieldNames = thisRowType.getFieldNames();
+ Preconditions.checkArgument(fieldNames.contains(timestampFieldName));
+ Preconditions.checkArgument(fieldNames.containsAll(metricFieldNames));
+ return thisRowType;
+ }
+
+ public RelNode toRel(RelOptTable.ToRelContext context,
+ RelOptTable relOptTable) {
+ final RelOptCluster cluster = context.getCluster();
+ final TableScan scan = LogicalTableScan.create(cluster, relOptTable);
+ return DruidQuery.create(cluster,
+ cluster.traitSetOf(BindableConvention.INSTANCE), relOptTable, this,
+ ImmutableList.<RelNode>of(scan));
+ }
+
+}
+
+// End DruidTable.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
new file mode 100644
index 0000000..0686dff
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+public class HiveDruidConf {
+
+ private int selectThreshold;
+
+
+ public HiveDruidConf(int selectThreshold) {
+ this.selectThreshold = selectThreshold;
+ }
+
+ public int getSelectThreshold() {
+ return selectThreshold;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
index bc48707..75b7ad2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
@@ -41,7 +41,7 @@ public class HiveSqlCountAggFunction extends SqlAggFunction implements CanAggreg
SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
super(
"count",
- SqlKind.OTHER_FUNCTION,
+ SqlKind.COUNT,
returnTypeInference,
operandTypeInference,
operandTypeChecker,
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
index 77dca1f..834fc3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
@@ -32,7 +32,7 @@ public class HiveSqlMinMaxAggFunction extends SqlAggFunction {
SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker, boolean isMin) {
super(
isMin ? "min" : "max",
- SqlKind.OTHER_FUNCTION,
+ isMin ? SqlKind.MIN : SqlKind.MAX,
returnTypeInference,
operandTypeInference,
operandTypeChecker,
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
index dc286a2..1d551a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
@@ -58,7 +58,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction implements CanAggregat
SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
super(
"sum",
- SqlKind.OTHER_FUNCTION,
+ SqlKind.SUM,
returnTypeInference,
operandTypeInference,
operandTypeChecker,
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
new file mode 100644
index 0000000..b3f8d9b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import java.util.Set;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+
+import com.google.common.collect.Sets;
+
+public class HiveDateGranularity extends SqlFunction {
+
+ public static final SqlFunction YEAR = new HiveDateGranularity("YEAR");
+ public static final SqlFunction QUARTER = new HiveDateGranularity("QUARTER");
+ public static final SqlFunction MONTH = new HiveDateGranularity("MONTH");
+ public static final SqlFunction WEEK = new HiveDateGranularity("WEEK");
+ public static final SqlFunction DAY = new HiveDateGranularity("DAY");
+ public static final SqlFunction HOUR = new HiveDateGranularity("HOUR");
+ public static final SqlFunction MINUTE = new HiveDateGranularity("MINUTE");
+ public static final SqlFunction SECOND = new HiveDateGranularity("SECOND");
+
+ public static final Set<SqlFunction> ALL_FUNCTIONS =
+ Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND);
+
+ private HiveDateGranularity(String name) {
+ super(
+ name,
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.TIME_NULLABLE,
+ null,
+ OperandTypes.ANY,
+ SqlFunctionCategory.TIMEDATE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
index aac6126..fd19d99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
@@ -48,6 +49,10 @@ public class HiveProjectSortTransposeRule extends RelOptRule {
operand(HiveSortLimit.class, any())));
}
+ protected HiveProjectSortTransposeRule(RelOptRuleOperand operand) {
+ super(operand);
+ }
+
//~ Methods ----------------------------------------------------------------
// implement RelOptRule
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
index feec3c2..fe29850 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
@@ -49,6 +50,10 @@ public class HiveSortProjectTransposeRule extends RelOptRule {
operand(HiveProject.class, any())));
}
+ protected HiveSortProjectTransposeRule(RelOptRuleOperand operand) {
+ super(operand);
+ }
+
//~ Methods ----------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 78c76ab..9a5becb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -22,19 +22,21 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
-import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidQuery;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
class ASTBuilder {
@@ -62,14 +64,32 @@ class ASTBuilder {
ASTBuilder.construct(HiveParser.TOK_TABNAME, "TOK_TABNAME")
.add(HiveParser.Identifier, hTbl.getHiveTableMD().getDbName())
.add(HiveParser.Identifier, hTbl.getHiveTableMD().getTableName()));
- // we need to carry the insideView information from calcite into the ast.
- if (((HiveTableScan) scan).isInsideView()) {
- b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(
- ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST").add(
- ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
- .add(HiveParser.StringLiteral, "\"insideView\"")
- .add(HiveParser.StringLiteral, "\"TRUE\""))));
+
+ HiveTableScan hts;
+ if (scan instanceof DruidQuery) {
+ hts = (HiveTableScan) ((DruidQuery)scan).getTableScan();
+ } else {
+ hts = (HiveTableScan) scan;
+ }
+ ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
+ if (scan instanceof DruidQuery) {
+ // Pass possible query to Druid
+ DruidQuery dq = (DruidQuery) scan;
+ propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+ .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_JSON + "\"")
+ .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(
+ dq.getQueryString()) + "\""));
+ propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+ .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_TYPE + "\"")
+ .add(HiveParser.StringLiteral, "\"" + dq.getQueryType() + "\""));
+ }
+ if (hts.isInsideView()) {
+ // We need to carry the insideView information from calcite into the ast.
+ propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+ .add(HiveParser.StringLiteral, "\"insideView\"")
+ .add(HiveParser.StringLiteral, "\"TRUE\""));
}
+ b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(propList));
// NOTE: Calcite considers tbls to be equal if their names are the same. Hence
// we need to provide Calcite the fully qualified table name (dbname.tblname)
@@ -77,7 +97,7 @@ class ASTBuilder {
// However in HIVE DB name can not appear in select list; in case of join
// where table names differ only in DB name, Hive would require user
// introducing explicit aliases for tbl.
- b.add(HiveParser.Identifier, ((HiveTableScan)scan).getTableAlias());
+ b.add(HiveParser.Identifier, hts.getTableAlias());
return b.node();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
index 40215a2..9f5e733 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
@@ -56,6 +56,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidQuery;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
@@ -625,7 +626,13 @@ public class ASTConverter {
private static final long serialVersionUID = 1L;
Schema(TableScan scan) {
- String tabName = ((HiveTableScan) scan).getTableAlias();
+ HiveTableScan hts;
+ if (scan instanceof DruidQuery) {
+ hts = (HiveTableScan) ((DruidQuery)scan).getTableScan();
+ } else {
+ hts = (HiveTableScan) scan;
+ }
+ String tabName = hts.getTableAlias();
for (RelDataTypeField field : scan.getRowType().getFieldList()) {
add(new ColumnInfo(tabName, field.getName()));
}