You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/10/19 00:36:15 UTC
[58/62] hive git commit: HIVE-13316: Upgrade to Calcite 1.10 (Jesus
Camacho Rodriguez, reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
deleted file mode 100644
index 43982aa..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
+++ /dev/null
@@ -1,1053 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
-import org.joda.time.Interval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-/**
- * Relational expression representing a scan of a Druid data set.
- *
- * TODO: to be removed when Calcite is upgraded to 1.9
- */
-public class DruidQuery extends TableScan {
-
- protected static final Logger LOG = LoggerFactory.getLogger(DruidQuery.class);
-
- protected QuerySpec querySpec;
-
- final DruidTable druidTable;
- final List<Interval> intervals;
- final ImmutableList<RelNode> rels;
-
- private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
-
- /**
- * Creates a DruidQuery.
- *
- * @param cluster Cluster
- * @param traitSet Traits
- * @param table Table
- * @param druidTable Druid table
- * @param interval Interval for the query
- * @param rels Internal relational expressions
- */
- private DruidQuery(RelOptCluster cluster, RelTraitSet traitSet,
- RelOptTable table, DruidTable druidTable,
- List<Interval> intervals, List<RelNode> rels) {
- super(cluster, traitSet, table);
- this.druidTable = druidTable;
- this.intervals = ImmutableList.copyOf(intervals);
- this.rels = ImmutableList.copyOf(rels);
-
- assert isValid(Litmus.THROW);
- }
-
- /** Returns a string describing the operations inside this query.
- *
- * <p>For example, "sfpal" means {@link TableScan} (s)
- * followed by {@link Filter} (f)
- * followed by {@link Project} (p)
- * followed by {@link Aggregate} (a)
- * followed by {@link Sort} (l).
- *
- * @see #isValidSignature(String)
- */
- String signature() {
- final StringBuilder b = new StringBuilder();
- for (RelNode rel : rels) {
- b.append(rel instanceof TableScan ? 's'
- : rel instanceof Project ? 'p'
- : rel instanceof Filter ? 'f'
- : rel instanceof Aggregate ? 'a'
- : rel instanceof Sort ? 'l'
- : '!');
- }
- return b.toString();
- }
-
- @Override public boolean isValid(Litmus litmus) {
- if (!super.isValid(litmus)) {
- return false;
- }
- final String signature = signature();
- if (!isValidSignature(signature)) {
- return litmus.fail("invalid signature");
- }
- if (rels.isEmpty()) {
- return litmus.fail("must have at least one rel");
- }
- for (int i = 0; i < rels.size(); i++) {
- final RelNode r = rels.get(i);
- if (i == 0) {
- if (!(r instanceof TableScan)) {
- return litmus.fail("first rel must be TableScan");
- }
- if (r.getTable() != table) {
- return litmus.fail("first rel must be based on table table");
- }
- } else {
- final List<RelNode> inputs = r.getInputs();
- if (inputs.size() != 1 || inputs.get(0) != rels.get(i - 1)) {
- return litmus.fail("each rel must have a single input");
- }
- if (r instanceof Aggregate) {
- final Aggregate aggregate = (Aggregate) r;
- if (aggregate.getGroupSets().size() != 1
- || aggregate.indicator) {
- return litmus.fail("no grouping sets");
- }
- for (AggregateCall call : aggregate.getAggCallList()) {
- if (call.filterArg >= 0) {
- return litmus.fail("no filtered aggregate functions");
- }
- }
- }
- if (r instanceof Filter) {
- final Filter filter = (Filter) r;
- if (!isValidFilter(filter.getCondition())) {
- return litmus.fail("invalid filter");
- }
- }
- if (r instanceof Sort) {
- final Sort sort = (Sort) r;
- if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
- return litmus.fail("offset not supported");
- }
- }
- }
- }
- return true;
- }
-
- boolean isValidFilter(RexNode e) {
- switch (e.getKind()) {
- case INPUT_REF:
- case LITERAL:
- return true;
- case AND:
- case OR:
- case NOT:
- case EQUALS:
- case LESS_THAN:
- case LESS_THAN_OR_EQUAL:
- case GREATER_THAN:
- case GREATER_THAN_OR_EQUAL:
- case BETWEEN:
- case IN:
- case CAST:
- return areValidFilters(((RexCall) e).getOperands());
- default:
- return false;
- }
- }
-
- private boolean areValidFilters(List<RexNode> es) {
- for (RexNode e : es) {
- if (!isValidFilter(e)) {
- return false;
- }
- }
- return true;
- }
-
- /** Returns whether a signature represents an sequence of relational operators
- * that can be translated into a valid Druid query. */
- static boolean isValidSignature(String signature) {
- return VALID_SIG.matcher(signature).matches();
- }
-
- /** Creates a DruidQuery. */
- public static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
- RelOptTable table, DruidTable druidTable, List<RelNode> rels) {
- return new DruidQuery(cluster, traitSet, table, druidTable, druidTable.intervals, rels);
- }
-
- /** Creates a DruidQuery. */
- private static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
- RelOptTable table, DruidTable druidTable, List<Interval> intervals, List<RelNode> rels) {
- return new DruidQuery(cluster, traitSet, table, druidTable, intervals, rels);
- }
-
- /** Extends a DruidQuery. */
- public static DruidQuery extendQuery(DruidQuery query, RelNode r) {
- final ImmutableList.Builder<RelNode> builder = ImmutableList.builder();
- return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
- query.druidTable, query.intervals, builder.addAll(query.rels).add(r).build());
- }
-
- /** Extends a DruidQuery. */
- public static DruidQuery extendQuery(DruidQuery query, List<Interval> intervals) {
- return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
- query.druidTable, intervals, query.rels);
- }
-
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert inputs.isEmpty();
- return this;
- }
-
- @Override public RelDataType deriveRowType() {
- return getCluster().getTypeFactory().createStructType(
- Pair.right(Util.last(rels).getRowType().getFieldList()),
- getQuerySpec().fieldNames);
- }
-
- public TableScan getTableScan() {
- return (TableScan) rels.get(0);
- }
-
- public RelNode getTopNode() {
- return Util.last(rels);
- }
-
- @Override public RelOptTable getTable() {
- return table;
- }
-
- @Override public RelWriter explainTerms(RelWriter pw) {
- for (RelNode rel : rels) {
- if (rel instanceof TableScan) {
- TableScan tableScan = (TableScan) rel;
- pw.item("table", tableScan.getTable().getQualifiedName());
- pw.item("intervals", intervals);
- } else if (rel instanceof Filter) {
- pw.item("filter", ((Filter) rel).getCondition());
- } else if (rel instanceof Project) {
- pw.item("projects", ((Project) rel).getProjects());
- } else if (rel instanceof Aggregate) {
- final Aggregate aggregate = (Aggregate) rel;
- pw.item("groups", aggregate.getGroupSet())
- .item("aggs", aggregate.getAggCallList());
- } else if (rel instanceof Sort) {
- final Sort sort = (Sort) rel;
- for (Ord<RelFieldCollation> ord
- : Ord.zip(sort.collation.getFieldCollations())) {
- pw.item("sort" + ord.i, ord.e.getFieldIndex());
- }
- for (Ord<RelFieldCollation> ord
- : Ord.zip(sort.collation.getFieldCollations())) {
- pw.item("dir" + ord.i, ord.e.shortString());
- }
- pw.itemIf("fetch", sort.fetch, sort.fetch != null);
- } else {
- throw new AssertionError("rel type not supported in Druid query "
- + rel);
- }
- }
- return pw;
- }
-
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
- RelMetadataQuery mq) {
- // Heuristic: we assume pushing query to Druid reduces cost by 90%
- return Util.last(rels).computeSelfCost(planner, mq).multiplyBy(.1);
- }
-
- @Override public RelNode project(ImmutableBitSet fieldsUsed,
- Set<RelDataTypeField> extraFields,
- RelBuilder relBuilder) {
- final int fieldCount = getRowType().getFieldCount();
- if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount))
- && extraFields.isEmpty()) {
- return this;
- }
- final List<RexNode> exprList = new ArrayList<>();
- final List<String> nameList = new ArrayList<>();
- final RexBuilder rexBuilder = getCluster().getRexBuilder();
- final List<RelDataTypeField> fields = getRowType().getFieldList();
-
- // Project the subset of fields.
- for (int i : fieldsUsed) {
- RelDataTypeField field = fields.get(i);
- exprList.add(rexBuilder.makeInputRef(this, i));
- nameList.add(field.getName());
- }
-
- // Project nulls for the extra fields. (Maybe a sub-class table has
- // extra fields, but we don't.)
- for (RelDataTypeField extraField : extraFields) {
- exprList.add(
- rexBuilder.ensureType(
- extraField.getType(),
- rexBuilder.constantNull(),
- true));
- nameList.add(extraField.getName());
- }
-
- HiveProject hp = (HiveProject) relBuilder.push(this).project(exprList, nameList).build();
- hp.setSynthetic();
- return hp;
- }
-
- public QuerySpec getQuerySpec() {
- if (querySpec == null) {
- querySpec = deriveQuerySpec();
- assert querySpec != null : this;
- }
- return querySpec;
- }
-
- protected QuerySpec deriveQuerySpec() {
- final RelDataType rowType = table.getRowType();
- int i = 1;
-
- RexNode filter = null;
- if (i < rels.size() && rels.get(i) instanceof Filter) {
- final Filter filterRel = (Filter) rels.get(i++);
- filter = filterRel.getCondition();
- }
-
- List<RexNode> projects = null;
- if (i < rels.size() && rels.get(i) instanceof Project) {
- final Project project = (Project) rels.get(i++);
- projects = project.getProjects();
- }
-
- ImmutableBitSet groupSet = null;
- List<AggregateCall> aggCalls = null;
- List<String> aggNames = null;
- if (i < rels.size() && rels.get(i) instanceof Aggregate) {
- final Aggregate aggregate = (Aggregate) rels.get(i++);
- groupSet = aggregate.getGroupSet();
- aggCalls = aggregate.getAggCallList();
- aggNames = Util.skip(aggregate.getRowType().getFieldNames(),
- groupSet.cardinality());
- }
-
- List<Integer> collationIndexes = null;
- List<Direction> collationDirections = null;
- Integer fetch = null;
- if (i < rels.size() && rels.get(i) instanceof Sort) {
- final Sort sort = (Sort) rels.get(i++);
- collationIndexes = new ArrayList<>();
- collationDirections = new ArrayList<>();
- for (RelFieldCollation fCol: sort.collation.getFieldCollations()) {
- collationIndexes.add(fCol.getFieldIndex());
- collationDirections.add(fCol.getDirection());
- }
- fetch = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null;
- }
-
- if (i != rels.size()) {
- throw new AssertionError("could not implement all rels");
- }
-
- return getQuery(rowType, filter, projects, groupSet, aggCalls, aggNames,
- collationIndexes, collationDirections, fetch);
- }
-
- public String getQueryType() {
- return getQuerySpec().queryType.getQueryName();
- }
-
- public String getQueryString() {
- return getQuerySpec().queryString;
- }
-
- private QuerySpec getQuery(RelDataType rowType, RexNode filter, List<RexNode> projects,
- ImmutableBitSet groupSet, List<AggregateCall> aggCalls, List<String> aggNames,
- List<Integer> collationIndexes, List<Direction> collationDirections, Integer fetch) {
- DruidQueryType queryType = DruidQueryType.SELECT;
- final Translator translator = new Translator(druidTable, rowType);
- List<String> fieldNames = rowType.getFieldNames();
-
- // Handle filter
- Json jsonFilter = null;
- if (filter != null) {
- jsonFilter = translator.translateFilter(filter);
- }
-
- // Then we handle project
- if (projects != null) {
- translator.metrics.clear();
- translator.dimensions.clear();
- final ImmutableList.Builder<String> builder = ImmutableList.builder();
- for (RexNode project : projects) {
- builder.add(translator.translate(project, true));
- }
- fieldNames = builder.build();
- }
-
- // Finally we handle aggregate and sort. Handling of these
- // operators is more complex, since we need to extract
- // the conditions to know whether the query will be
- // executed as a Timeseries, TopN, or GroupBy in Druid
- final List<String> dimensions = new ArrayList<>();
- final List<JsonAggregation> aggregations = new ArrayList<>();
- String granularity = "ALL";
- Direction timeSeriesDirection = null;
- JsonLimit limit = null;
- if (groupSet != null) {
- assert aggCalls != null;
- assert aggNames != null;
- assert aggCalls.size() == aggNames.size();
-
- int timePositionIdx = -1;
- final ImmutableList.Builder<String> builder = ImmutableList.builder();
- if (projects != null) {
- for (int groupKey : groupSet) {
- final String s = fieldNames.get(groupKey);
- final RexNode project = projects.get(groupKey);
- if (project instanceof RexInputRef) {
- // Reference, it could be to the timestamp column or any other dimension
- final RexInputRef ref = (RexInputRef) project;
- final String origin = druidTable.rowType.getFieldList().get(ref.getIndex()).getName();
- if (origin.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
- granularity = "NONE";
- builder.add(s);
- assert timePositionIdx == -1;
- timePositionIdx = groupKey;
- } else {
- dimensions.add(s);
- builder.add(s);
- }
- } else if (project instanceof RexCall) {
- // Call, check if we should infer granularity
- RexCall call = (RexCall) project;
- if (HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
- granularity = call.getOperator().getName();
- builder.add(s);
- assert timePositionIdx == -1;
- timePositionIdx = groupKey;
- } else {
- dimensions.add(s);
- builder.add(s);
- }
- } else {
- throw new AssertionError("incompatible project expression: " + project);
- }
- }
- } else {
- for (int groupKey : groupSet) {
- final String s = fieldNames.get(groupKey);
- if (s.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
- granularity = "NONE";
- builder.add(s);
- assert timePositionIdx == -1;
- timePositionIdx = groupKey;
- } else {
- dimensions.add(s);
- builder.add(s);
- }
- }
- }
-
- for (Pair<AggregateCall, String> agg : Pair.zip(aggCalls, aggNames)) {
- final JsonAggregation jsonAggregation =
- getJsonAggregation(fieldNames, agg.right, agg.left);
- aggregations.add(jsonAggregation);
- builder.add(jsonAggregation.name);
- }
-
- fieldNames = builder.build();
-
- ImmutableList<JsonCollation> collations = null;
- boolean sortsMetric = false;
- if (collationIndexes != null) {
- assert collationDirections != null;
- ImmutableList.Builder<JsonCollation> colBuilder = new ImmutableList.Builder<JsonCollation>();
- for (Pair<Integer,Direction> p : Pair.zip(collationIndexes, collationDirections)) {
- colBuilder.add(new JsonCollation(fieldNames.get(p.left),
- p.right == Direction.DESCENDING ? "descending" : "ascending"));
- if (p.left >= groupSet.cardinality() && p.right == Direction.DESCENDING) {
- // Currently only support for DESC in TopN
- sortsMetric = true;
- } else if (p.left == timePositionIdx) {
- assert timeSeriesDirection == null;
- timeSeriesDirection = p.right;
- }
- }
- collations = colBuilder.build();
- }
-
- limit = new JsonLimit("default", fetch, collations);
-
- if (dimensions.isEmpty() && (collations == null || timeSeriesDirection != null)) {
- queryType = DruidQueryType.TIMESERIES;
- assert fetch == null;
- } else if (dimensions.size() == 1 && sortsMetric && collations.size() == 1 && fetch != null) {
- queryType = DruidQueryType.TOP_N;
- } else {
- queryType = DruidQueryType.GROUP_BY;
- }
- } else {
- assert aggCalls == null;
- assert aggNames == null;
- assert collationIndexes == null || collationIndexes.isEmpty();
- assert collationDirections == null || collationDirections.isEmpty();
- }
-
- final StringWriter sw = new StringWriter();
- final JsonFactory factory = new JsonFactory();
- try {
- final JsonGenerator generator = factory.createGenerator(sw);
-
- switch (queryType) {
- case TIMESERIES:
- generator.writeStartObject();
-
- generator.writeStringField("queryType", "timeseries");
- generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("descending", timeSeriesDirection != null &&
- timeSeriesDirection == Direction.DESCENDING ? "true" : "false");
- generator.writeStringField("granularity", granularity);
- writeFieldIf(generator, "filter", jsonFilter);
- writeField(generator, "aggregations", aggregations);
- writeFieldIf(generator, "postAggregations", null);
- writeField(generator, "intervals", intervals);
-
- generator.writeEndObject();
- break;
-
- case TOP_N:
- generator.writeStartObject();
-
- generator.writeStringField("queryType", "topN");
- generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("granularity", granularity);
- generator.writeStringField("dimension", dimensions.get(0));
- generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0)));
- writeFieldIf(generator, "filter", jsonFilter);
- writeField(generator, "aggregations", aggregations);
- writeFieldIf(generator, "postAggregations", null);
- writeField(generator, "intervals", intervals);
- generator.writeNumberField("threshold", fetch);
-
- generator.writeEndObject();
- break;
-
- case GROUP_BY:
- generator.writeStartObject();
-
- if (aggregations.isEmpty()) {
- // Druid requires at least one aggregation, otherwise gives:
- // Must have at least one AggregatorFactory
- aggregations.add(
- new JsonAggregation("longSum", "dummy_agg", "dummy_agg"));
- }
-
- generator.writeStringField("queryType", "groupBy");
- generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("granularity", granularity);
- writeField(generator, "dimensions", dimensions);
- writeFieldIf(generator, "limitSpec", limit);
- writeFieldIf(generator, "filter", jsonFilter);
- writeField(generator, "aggregations", aggregations);
- writeFieldIf(generator, "postAggregations", null);
- writeField(generator, "intervals", intervals);
- writeFieldIf(generator, "having", null);
-
- generator.writeEndObject();
- break;
-
- case SELECT:
- generator.writeStartObject();
-
- generator.writeStringField("queryType", "select");
- generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("descending", "false");
- writeField(generator, "intervals", intervals);
- writeFieldIf(generator, "filter", jsonFilter);
- writeField(generator, "dimensions", translator.dimensions);
- writeField(generator, "metrics", translator.metrics);
- generator.writeStringField("granularity", granularity);
-
- generator.writeFieldName("pagingSpec");
- generator.writeStartObject();
- generator.writeNumberField("threshold", fetch != null ? fetch : 1);
- generator.writeEndObject();
-
- generator.writeFieldName("context");
- generator.writeStartObject();
- generator.writeBooleanField(Constants.DRUID_QUERY_FETCH, fetch != null);
- generator.writeEndObject();
-
- generator.writeEndObject();
- break;
-
- default:
- throw new AssertionError("unknown query type " + queryType);
- }
-
- generator.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return new QuerySpec(queryType, sw.toString(), fieldNames);
- }
-
- private JsonAggregation getJsonAggregation(List<String> fieldNames,
- String name, AggregateCall aggCall) {
- final List<String> list = new ArrayList<>();
- for (Integer arg : aggCall.getArgList()) {
- list.add(fieldNames.get(arg));
- }
- final String only = Iterables.getFirst(list, null);
- final boolean b = aggCall.getType().getSqlTypeName() == SqlTypeName.DOUBLE;
- switch (aggCall.getAggregation().getKind()) {
- case COUNT:
- if (aggCall.isDistinct()) {
- return new JsonCardinalityAggregation("cardinality", name, list);
- }
- return new JsonAggregation("count", name, only);
- case SUM:
- case SUM0:
- return new JsonAggregation(b ? "doubleSum" : "longSum", name, only);
- case MIN:
- return new JsonAggregation(b ? "doubleMin" : "longMin", name, only);
- case MAX:
- return new JsonAggregation(b ? "doubleMax" : "longMax", name, only);
- default:
- throw new AssertionError("unknown aggregate " + aggCall);
- }
- }
-
- private static void writeField(JsonGenerator generator, String fieldName,
- Object o) throws IOException {
- generator.writeFieldName(fieldName);
- writeObject(generator, o);
- }
-
- private static void writeFieldIf(JsonGenerator generator, String fieldName,
- Object o) throws IOException {
- if (o != null) {
- writeField(generator, fieldName, o);
- }
- }
-
- private static void writeArray(JsonGenerator generator, List<?> elements)
- throws IOException {
- generator.writeStartArray();
- for (Object o : elements) {
- writeObject(generator, o);
- }
- generator.writeEndArray();
- }
-
- private static void writeObject(JsonGenerator generator, Object o)
- throws IOException {
- if (o instanceof String) {
- String s = (String) o;
- generator.writeString(s);
- } else if (o instanceof Interval) {
- Interval i = (Interval) o;
- generator.writeString(i.toString());
- } else if (o instanceof Integer) {
- Integer i = (Integer) o;
- generator.writeNumber(i);
- } else if (o instanceof List) {
- writeArray(generator, (List<?>) o);
- } else if (o instanceof Json) {
- ((Json) o).write(generator);
- } else {
- throw new AssertionError("not a json object: " + o);
- }
- }
-
- /** Druid query specification. */
- public static class QuerySpec {
- final DruidQueryType queryType;
- final String queryString;
- final List<String> fieldNames;
-
- QuerySpec(DruidQueryType queryType, String queryString,
- List<String> fieldNames) {
- this.queryType = Preconditions.checkNotNull(queryType);
- this.queryString = Preconditions.checkNotNull(queryString);
- this.fieldNames = ImmutableList.copyOf(fieldNames);
- }
-
- @Override public int hashCode() {
- return Objects.hash(queryType, queryString, fieldNames);
- }
-
- @Override public boolean equals(Object obj) {
- return obj == this
- || obj instanceof QuerySpec
- && queryType == ((QuerySpec) obj).queryType
- && queryString.equals(((QuerySpec) obj).queryString)
- && fieldNames.equals(((QuerySpec) obj).fieldNames);
- }
-
- @Override public String toString() {
- return "{queryType: " + queryType
- + ", queryString: " + queryString
- + ", fieldNames: " + fieldNames + "}";
- }
-
- String getQueryString(String pagingIdentifier, int offset) {
- if (pagingIdentifier == null) {
- return queryString;
- }
- return queryString.replace("\"threshold\":",
- "\"pagingIdentifiers\":{\"" + pagingIdentifier + "\":" + offset
- + "},\"threshold\":");
- }
- }
-
- /** Translates scalar expressions to Druid field references. */
- private static class Translator {
- final List<String> dimensions = new ArrayList<>();
- final List<String> metrics = new ArrayList<>();
- final DruidTable druidTable;
- final RelDataType rowType;
-
- Translator(DruidTable druidTable, RelDataType rowType) {
- this.druidTable = druidTable;
- this.rowType = rowType;
- for (RelDataTypeField f : rowType.getFieldList()) {
- final String fieldName = f.getName();
- if (druidTable.metricFieldNames.contains(fieldName)) {
- metrics.add(fieldName);
- } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
- dimensions.add(fieldName);
- }
- }
- }
-
- String translate(RexNode e, boolean set) {
- switch (e.getKind()) {
- case INPUT_REF:
- final RexInputRef ref = (RexInputRef) e;
- final String fieldName =
- rowType.getFieldList().get(ref.getIndex()).getName();
- if (set) {
- if (druidTable.metricFieldNames.contains(fieldName)) {
- metrics.add(fieldName);
- } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
- dimensions.add(fieldName);
- }
- }
- return fieldName;
-
- case CAST:
- return tr(e, 0, set);
-
- case LITERAL:
- return ((RexLiteral) e).getValue2().toString();
-
- case OTHER_FUNCTION:
- final RexCall call = (RexCall) e;
- assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
- return tr(call, 0, set);
-
- default:
- throw new AssertionError("invalid expression " + e);
- }
- }
-
- @SuppressWarnings("incomplete-switch")
- private JsonFilter translateFilter(RexNode e) {
- RexCall call;
- switch (e.getKind()) {
- case EQUALS:
- case NOT_EQUALS:
- case GREATER_THAN:
- case GREATER_THAN_OR_EQUAL:
- case LESS_THAN:
- case LESS_THAN_OR_EQUAL:
- call = (RexCall) e;
- int posRef;
- int posConstant;
- if (RexUtil.isConstant(call.getOperands().get(1))) {
- posRef = 0;
- posConstant = 1;
- } else if (RexUtil.isConstant(call.getOperands().get(0))) {
- posRef = 1;
- posConstant = 0;
- } else {
- throw new AssertionError("it is not a valid comparison: " + e);
- }
- switch (e.getKind()) {
- case EQUALS:
- return new JsonSelector("selector", tr(e, posRef), tr(e, posConstant));
- case NOT_EQUALS:
- return new JsonCompositeFilter("not",
- ImmutableList.of(new JsonSelector("selector", tr(e, posRef), tr(e, posConstant))));
- case GREATER_THAN:
- return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), true, null, false,
- false);
- case GREATER_THAN_OR_EQUAL:
- return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), false, null, false,
- false);
- case LESS_THAN:
- return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), true,
- false);
- case LESS_THAN_OR_EQUAL:
- return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), false,
- false);
- }
- case AND:
- case OR:
- case NOT:
- call = (RexCall) e;
- return new JsonCompositeFilter(e.getKind().toString().toLowerCase(),
- translateFilters(call.getOperands()));
- default:
- throw new AssertionError("cannot translate filter: " + e);
- }
- }
-
- private String tr(RexNode call, int index) {
- return tr(call, index, false);
- }
-
- private String tr(RexNode call, int index, boolean set) {
- return translate(((RexCall) call).getOperands().get(index), set);
- }
-
- private List<JsonFilter> translateFilters(List<RexNode> operands) {
- final ImmutableList.Builder<JsonFilter> builder =
- ImmutableList.builder();
- for (RexNode operand : operands) {
- builder.add(translateFilter(operand));
- }
- return builder.build();
- }
- }
-
- /** Object that knows how to write itself to a
- * {@link com.fasterxml.jackson.core.JsonGenerator}. */
- private interface Json {
- void write(JsonGenerator generator) throws IOException;
- }
-
- /** Aggregation element of a Druid "groupBy" or "topN" query. */
- private static class JsonAggregation implements Json {
- final String type;
- final String name;
- final String fieldName;
-
- private JsonAggregation(String type, String name, String fieldName) {
- this.type = type;
- this.name = name;
- this.fieldName = fieldName;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- generator.writeStringField("name", name);
- writeFieldIf(generator, "fieldName", fieldName);
- generator.writeEndObject();
- }
- }
-
- /** Collation element of a Druid "groupBy" query. */
- private static class JsonLimit implements Json {
- final String type;
- final Integer limit;
- final ImmutableList<JsonCollation> collations;
-
- private JsonLimit(String type, Integer limit, ImmutableList<JsonCollation> collations) {
- this.type = type;
- this.limit = limit;
- this.collations = collations;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- writeFieldIf(generator, "limit", limit);
- writeFieldIf(generator, "columns", collations);
- generator.writeEndObject();
- }
- }
-
- /** Collation element of a Druid "groupBy" query. */
- private static class JsonCollation implements Json {
- final String dimension;
- final String direction;
-
- private JsonCollation(String dimension, String direction) {
- this.dimension = dimension;
- this.direction = direction;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("dimension", dimension);
- writeFieldIf(generator, "direction", direction);
- generator.writeEndObject();
- }
- }
-
- /** Aggregation element that calls the "cardinality" function. */
- private static class JsonCardinalityAggregation extends JsonAggregation {
- final List<String> fieldNames;
-
- private JsonCardinalityAggregation(String type, String name,
- List<String> fieldNames) {
- super(type, name, null);
- this.fieldNames = fieldNames;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- generator.writeStringField("name", name);
- writeFieldIf(generator, "fieldNames", fieldNames);
- generator.writeEndObject();
- }
- }
-
- /** Filter element of a Druid "groupBy" or "topN" query. */
- private abstract static class JsonFilter implements Json {
- final String type;
-
- private JsonFilter(String type) {
- this.type = type;
- }
- }
-
- /** Equality filter. */
- private static class JsonSelector extends JsonFilter {
- private final String dimension;
- private final String value;
-
- private JsonSelector(String type, String dimension, String value) {
- super(type);
- this.dimension = dimension;
- this.value = value;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- generator.writeStringField("dimension", dimension);
- generator.writeStringField("value", value);
- generator.writeEndObject();
- }
- }
-
- /** Bound filter. */
- private static class JsonBound extends JsonFilter {
- private final String dimension;
- private final String lower;
- private final boolean lowerStrict;
- private final String upper;
- private final boolean upperStrict;
- private final boolean alphaNumeric;
-
- private JsonBound(String type, String dimension, String lower,
- boolean lowerStrict, String upper, boolean upperStrict,
- boolean alphaNumeric) {
- super(type);
- this.dimension = dimension;
- this.lower = lower;
- this.lowerStrict = lowerStrict;
- this.upper = upper;
- this.upperStrict = upperStrict;
- this.alphaNumeric = alphaNumeric;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- generator.writeStringField("dimension", dimension);
- if (lower != null) {
- generator.writeStringField("lower", lower);
- generator.writeBooleanField("lowerStrict", lowerStrict);
- }
- if (upper != null) {
- generator.writeStringField("upper", upper);
- generator.writeBooleanField("upperStrict", upperStrict);
- }
- generator.writeBooleanField("alphaNumeric", alphaNumeric);
- generator.writeEndObject();
- }
- }
-
- /** Filter that combines other filters using a boolean operator. */
- private static class JsonCompositeFilter extends JsonFilter {
- private final List<? extends JsonFilter> fields;
-
- private JsonCompositeFilter(String type,
- List<? extends JsonFilter> fields) {
- super(type);
- this.fields = fields;
- }
-
- public void write(JsonGenerator generator) throws IOException {
- generator.writeStartObject();
- generator.writeStringField("type", type);
- switch (type) {
- case "NOT":
- writeField(generator, "field", fields.get(0));
- break;
- default:
- writeField(generator, "fields", fields);
- }
- generator.writeEndObject();
- }
- }
-
-}
-
-// End DruidQuery.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
deleted file mode 100644
index 228b307..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-/**
- * Type of Druid query.
- *
- * TODO: to be removed when Calcite is upgraded to 1.9
- */
-public enum DruidQueryType {
- SELECT("select"),
- TOP_N("topN"),
- GROUP_BY("groupBy"),
- TIMESERIES("timeseries");
-
- private final String queryName;
-
- private DruidQueryType(String queryName) {
- this.queryName = queryName;
- }
-
- public String getQueryName() {
- return this.queryName;
- }
-}
-
-// End QueryType.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
deleted file mode 100644
index f68ffa5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
+++ /dev/null
@@ -1,591 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
-import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule;
-import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortProjectTransposeRule;
-import org.joda.time.Interval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Rules and relational operators for {@link DruidQuery}.
- *
- * TODO: to be removed when Calcite is upgraded to 1.9
- */
-public class DruidRules {
-
- protected static final Logger LOG = LoggerFactory.getLogger(DruidRules.class);
-
- // Avoid instantiation
- private DruidRules() {
- }
-
- public static final DruidFilterRule FILTER = new DruidFilterRule();
- public static final DruidProjectRule PROJECT = new DruidProjectRule();
- public static final DruidAggregateRule AGGREGATE = new DruidAggregateRule();
- public static final DruidProjectAggregateRule PROJECT_AGGREGATE = new DruidProjectAggregateRule();
- public static final DruidSortRule SORT = new DruidSortRule();
- public static final DruidProjectSortRule PROJECT_SORT = new DruidProjectSortRule();
- public static final DruidSortProjectRule SORT_PROJECT = new DruidSortProjectRule();
-
- /** Predicate that returns whether Druid can not handle an aggregate. */
- private static final Predicate<AggregateCall> BAD_AGG = new Predicate<AggregateCall>() {
- public boolean apply(AggregateCall aggregateCall) {
- switch (aggregateCall.getAggregation().getKind()) {
- case COUNT:
- case SUM:
- case SUM0:
- case MIN:
- case MAX:
- return false;
- default:
- return true;
- }
- }
- };
-
- /**
- * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}.
- */
- private static class DruidFilterRule extends RelOptRule {
- private DruidFilterRule() {
- super(operand(Filter.class,
- operand(DruidQuery.class, none())));
- }
-
- public void onMatch(RelOptRuleCall call) {
- final Filter filter = call.rel(0);
- final DruidQuery query = call.rel(1);
- if (!DruidQuery.isValidSignature(query.signature() + 'f')
- || !query.isValidFilter(filter.getCondition())) {
- return;
- }
- // Timestamp
- int timestampFieldIdx = -1;
- for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
- if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(
- query.getRowType().getFieldList().get(i).getName())) {
- timestampFieldIdx = i;
- break;
- }
- }
- final Pair<List<RexNode>, List<RexNode>> pair = splitFilters(
- filter.getCluster().getRexBuilder(), query, filter.getCondition(), timestampFieldIdx);
- if (pair == null) {
- // We can't push anything useful to Druid.
- return;
- }
- List<Interval> intervals = null;
- if (!pair.left.isEmpty()) {
- intervals = DruidIntervalUtils.createInterval(
- query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
- pair.left);
- if (intervals == null) {
- // We can't push anything useful to Druid.
- return;
- }
- }
- DruidQuery newDruidQuery = query;
- if (!pair.right.isEmpty()) {
- if (!validConditions(pair.right)) {
- return;
- }
- final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
- RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), pair.right, false));
- newDruidQuery = DruidQuery.extendQuery(query, newFilter);
- }
- if (intervals != null) {
- newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals);
- }
- call.transformTo(newDruidQuery);
- }
-
- /* Splits the filter condition in two groups: those that filter on the timestamp column
- * and those that filter on other fields */
- private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder,
- final DruidQuery input, RexNode cond, final int timestampFieldIdx) {
- final List<RexNode> timeRangeNodes = new ArrayList<>();
- final List<RexNode> otherNodes = new ArrayList<>();
- List<RexNode> conjs = RelOptUtil.conjunctions(cond);
- if (conjs.isEmpty()) {
- // We do not transform
- return null;
- }
- // Number of columns with the dimensions and timestamp
- int max = input.getRowType().getFieldCount() - input.druidTable.metricFieldNames.size();
- for (RexNode conj : conjs) {
- final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
- conj.accept(visitor);
- if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
- if (visitor.inputPosReferenced.size() != 1) {
- // Complex predicate, transformation currently not supported
- return null;
- }
- timeRangeNodes.add(conj);
- } else if (!visitor.inputPosReferenced.tailSet(max).isEmpty()) {
- // Filter on metrics, not supported in Druid
- return null;
- } else {
- otherNodes.add(conj);
- }
- }
- return Pair.of(timeRangeNodes, otherNodes);
- }
-
- /* Checks that all conditions are on ref + literal*/
- private static boolean validConditions(List<RexNode> nodes) {
- for (RexNode node: nodes) {
- try {
- node.accept(
- new RexVisitorImpl<Void>(true) {
- @SuppressWarnings("incomplete-switch")
- @Override public Void visitCall(RexCall call) {
- switch (call.getKind()) {
- case CAST:
- // Only if on top of ref or literal
- if (call.getOperands().get(0) instanceof RexInputRef ||
- call.getOperands().get(0) instanceof RexLiteral) {
- break;
- }
- // Not supported
- throw Util.FoundOne.NULL;
- case EQUALS:
- case LESS_THAN:
- case LESS_THAN_OR_EQUAL:
- case GREATER_THAN:
- case GREATER_THAN_OR_EQUAL:
- // Check cast
- RexNode left = call.getOperands().get(0);
- if (left.getKind() == SqlKind.CAST) {
- left = ((RexCall)left).getOperands().get(0);
- }
- RexNode right = call.getOperands().get(1);
- if (right.getKind() == SqlKind.CAST) {
- right = ((RexCall)right).getOperands().get(0);
- }
- if (left instanceof RexInputRef && right instanceof RexLiteral) {
- break;
- }
- if (right instanceof RexInputRef && left instanceof RexLiteral) {
- break;
- }
- // Not supported if it is not ref + literal
- throw Util.FoundOne.NULL;
- case BETWEEN:
- case IN:
- // Not supported here yet
- throw Util.FoundOne.NULL;
- }
- return super.visitCall(call);
- }
- });
- } catch (Util.FoundOne e) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
- */
- private static class DruidProjectRule extends RelOptRule {
- private DruidProjectRule() {
- super(operand(Project.class,
- operand(DruidQuery.class, none())));
- }
-
- public void onMatch(RelOptRuleCall call) {
- final Project project = call.rel(0);
- final DruidQuery query = call.rel(1);
- if (!DruidQuery.isValidSignature(query.signature() + 'p')) {
- return;
- }
-
- if (canProjectAll(project.getProjects())) {
- // All expressions can be pushed to Druid in their entirety.
- final RelNode newProject = project.copy(project.getTraitSet(),
- ImmutableList.of(Util.last(query.rels)));
- RelNode newNode = DruidQuery.extendQuery(query, newProject);
- call.transformTo(newNode);
- return;
- }
- final Pair<List<RexNode>, List<RexNode>> pair = splitProjects(
- project.getCluster().getRexBuilder(), query, project.getProjects());
- if (pair == null) {
- // We can't push anything useful to Druid.
- return;
- }
- final List<RexNode> above = pair.left;
- final List<RexNode> below = pair.right;
- final RelDataTypeFactory.FieldInfoBuilder builder = project.getCluster().getTypeFactory()
- .builder();
- final RelNode input = Util.last(query.rels);
- for (RexNode e : below) {
- final String name;
- if (e instanceof RexInputRef) {
- name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex());
- } else {
- name = null;
- }
- builder.add(name, e.getType());
- }
- final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build());
- final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
- final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above,
- project.getRowType());
- call.transformTo(newProject2);
- }
-
- private static boolean canProjectAll(List<RexNode> nodes) {
- for (RexNode e : nodes) {
- if (!(e instanceof RexInputRef)) {
- return false;
- }
- }
- return true;
- }
-
- private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder,
- final RelNode input, List<RexNode> nodes) {
- final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
- for (RexNode node : nodes) {
- node.accept(visitor);
- }
- if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) {
- // All inputs are referenced
- return null;
- }
- final List<RexNode> belowNodes = new ArrayList<>();
- final List<RelDataType> belowTypes = new ArrayList<>();
- final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced);
- for (int i : positions) {
- final RexNode node = rexBuilder.makeInputRef(input, i);
- belowNodes.add(node);
- belowTypes.add(node.getType());
- }
- final List<RexNode> aboveNodes = new ArrayList<>();
- for (RexNode node : nodes) {
- aboveNodes.add(node.accept(new RexShuttle() {
- @Override
- public RexNode visitInputRef(RexInputRef ref) {
- final int index = positions.indexOf(ref.getIndex());
- return rexBuilder.makeInputRef(belowTypes.get(index), index);
- }
- }));
- }
- return Pair.of(aboveNodes, belowNodes);
- }
- }
-
- /**
- * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
- */
- private static class DruidAggregateRule extends RelOptRule {
- private DruidAggregateRule() {
- super(operand(Aggregate.class,
- operand(DruidQuery.class, none())));
- }
-
- public void onMatch(RelOptRuleCall call) {
- final Aggregate aggregate = call.rel(0);
- final DruidQuery query = call.rel(1);
- if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
- return;
- }
- if (aggregate.indicator
- || aggregate.getGroupSets().size() != 1
- || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
- || !validAggregate(aggregate, query)) {
- return;
- }
- final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
- ImmutableList.of(Util.last(query.rels)));
- call.transformTo(DruidQuery.extendQuery(query, newAggregate));
- }
-
- /* Check whether agg functions reference timestamp */
- private static boolean validAggregate(Aggregate aggregate, DruidQuery query) {
- ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- builder.addAll(aggCall.getArgList());
- }
- return !checkTimestampRefOnQuery(builder.build(), query.getTopNode());
- }
- }
-
- /**
- * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and
- * {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
- */
- private static class DruidProjectAggregateRule extends RelOptRule {
- private DruidProjectAggregateRule() {
- super(operand(Aggregate.class,
- operand(Project.class,
- operand(DruidQuery.class, none()))));
- }
-
- public void onMatch(RelOptRuleCall call) {
- final Aggregate aggregate = call.rel(0);
- final Project project = call.rel(1);
- final DruidQuery query = call.rel(2);
- if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
- return;
- }
- int timestampIdx;
- if ((timestampIdx = validProject(project, query)) == -1) {
- return;
- }
- if (aggregate.indicator
- || aggregate.getGroupSets().size() != 1
- || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
- || !validAggregate(aggregate, timestampIdx)) {
- return;
- }
-
- final RelNode newProject = project.copy(project.getTraitSet(),
- ImmutableList.of(Util.last(query.rels)));
- final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject);
- final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
- ImmutableList.of(Util.last(projectDruidQuery.rels)));
- call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate));
- }
-
- /* To be a valid Project, we allow it to contain references, and a single call
- * to an EXTRACT function on the timestamp column. Returns the reference to
- * the timestamp, if any. */
- private static int validProject(Project project, DruidQuery query) {
- List<RexNode> nodes = project.getProjects();
- int idxTimestamp = -1;
- for (int i = 0; i < nodes.size(); i++) {
- final RexNode e = nodes.get(i);
- if (e instanceof RexCall) {
- // It is a call, check that it is EXTRACT and follow-up conditions
- final RexCall call = (RexCall) e;
- if (!HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
- return -1;
- }
- if (idxTimestamp != -1) {
- // Already one usage of timestamp column
- return -1;
- }
- if (!(call.getOperands().get(0) instanceof RexInputRef)) {
- return -1;
- }
- final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
- if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode()))) {
- return -1;
- }
- idxTimestamp = i;
- continue;
- }
- if (!(e instanceof RexInputRef)) {
- // It needs to be a reference
- return -1;
- }
- final RexInputRef ref = (RexInputRef) e;
- if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode())) {
- if (idxTimestamp != -1) {
- // Already one usage of timestamp column
- return -1;
- }
- idxTimestamp = i;
- }
- }
- return idxTimestamp;
- }
-
- private static boolean validAggregate(Aggregate aggregate, int idx) {
- if (!aggregate.getGroupSet().get(idx)) {
- return false;
- }
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- if (aggCall.getArgList().contains(idx)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * Rule to push an {@link org.apache.calcite.rel.core.Sort} through a
- * {@link org.apache.calcite.rel.core.Project}. Useful to transform
- * to complex Druid queries.
- */
- private static class DruidProjectSortRule extends HiveSortProjectTransposeRule {
- private DruidProjectSortRule() {
- super(operand(Sort.class,
- operand(Project.class,
- operand(DruidQuery.class, none()))));
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- return true;
- }
-
- }
-
- /**
- * Rule to push back {@link org.apache.calcite.rel.core.Project} through a
- * {@link org.apache.calcite.rel.core.Sort}. Useful if after pushing Sort,
- * we could not push it inside DruidQuery.
- */
- private static class DruidSortProjectRule extends HiveProjectSortTransposeRule {
- private DruidSortProjectRule() {
- super(operand(Project.class,
- operand(Sort.class,
- operand(DruidQuery.class, none()))));
- }
- }
-
- /**
- * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
- */
- private static class DruidSortRule extends RelOptRule {
- private DruidSortRule() {
- super(operand(Sort.class,
- operand(DruidQuery.class, none())));
- }
-
- public void onMatch(RelOptRuleCall call) {
- final Sort sort = call.rel(0);
- final DruidQuery query = call.rel(1);
- if (!DruidQuery.isValidSignature(query.signature() + 'l')) {
- return;
- }
- // Either it is:
- // - a sort without limit on the time column on top of
- // Agg operator (transformable to timeseries query), or
- // - it is a sort w/o limit on columns that do not include
- // the time column on top of Agg operator, or
- // - a simple limit on top of other operator than Agg
- if (!validSortLimit(sort, query)) {
- return;
- }
- final RelNode newSort = sort.copy(sort.getTraitSet(),
- ImmutableList.of(Util.last(query.rels)));
- call.transformTo(DruidQuery.extendQuery(query, newSort));
- }
-
- /* Check sort valid */
- private static boolean validSortLimit(Sort sort, DruidQuery query) {
- if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
- // offset not supported by Druid
- return false;
- }
- if (query.getTopNode() instanceof Aggregate) {
- final Aggregate topAgg = (Aggregate) query.getTopNode();
- final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder();
- int metricsRefs = 0;
- for (RelFieldCollation col : sort.collation.getFieldCollations()) {
- int idx = col.getFieldIndex();
- if (idx >= topAgg.getGroupCount()) {
- metricsRefs++;
- continue;
- }
- positionsReferenced.set(topAgg.getGroupSet().nth(idx));
- }
- boolean refsTimestamp =
- checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput());
- if (refsTimestamp && metricsRefs != 0) {
- return false;
- }
- return true;
- }
- // If it is going to be a Druid select operator, we push the limit iff
- // 1) it does not contain a sort specification (required by Druid) and
- // 2) limit is smaller than select threshold, as otherwise it might be
- // better to obtain some parallelization and let global limit
- // optimizer kick in
- HiveDruidConf conf = sort.getCluster().getPlanner()
- .getContext().unwrap(HiveDruidConf.class);
- return HiveCalciteUtil.pureLimitRelNode(sort) &&
- RexLiteral.intValue(sort.fetch) <= conf.getSelectThreshold();
- }
- }
-
- /* Check if any of the references leads to the timestamp column */
- private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top) {
- if (top instanceof Project) {
- ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
- final Project project = (Project) top;
- for (int index : set) {
- RexNode node = project.getProjects().get(index);
- if (node instanceof RexInputRef) {
- newSet.set(((RexInputRef)node).getIndex());
- } else if (node instanceof RexCall) {
- RexCall call = (RexCall) node;
- assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
- newSet.set(((RexInputRef)call.getOperands().get(0)).getIndex());
- }
- }
- top = project.getInput();
- set = newSet.build();
- }
-
- // Check if any references the timestamp column
- for (int index : set) {
- if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(top.getRowType().getFieldNames().get(index))) {
- return true;
- }
- }
-
- return false;
- }
-
-}
-
-// End DruidRules.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
deleted file mode 100644
index 3b3f68a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-import java.util.Map;
-
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AbstractSchema;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Schema mapped onto a Druid instance.
- *
- * TODO: to be removed when Calcite is upgraded to 1.9
- */
-public class DruidSchema extends AbstractSchema {
- final String url;
-
- /**
- * Creates a Druid schema.
- *
- * @param url URL of query REST service
- */
- public DruidSchema(String url) {
- this.url = Preconditions.checkNotNull(url);
- }
-
- @Override protected Map<String, Table> getTableMap() {
- final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
- return builder.build();
- }
-}
-
-// End DruidSchema.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
deleted file mode 100644
index 7288291..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.calcite.interpreter.BindableConvention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.schema.impl.AbstractTable;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * Table mapped onto a Druid table.
- *
- * TODO: to be removed when Calcite is upgraded to 1.9
- */
-public class DruidTable extends AbstractTable implements TranslatableTable {
-
- public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
- public static final Interval DEFAULT_INTERVAL = new Interval(
- new DateTime("1900-01-01"),
- new DateTime("3000-01-01")
- );
-
- final DruidSchema schema;
- final String dataSource;
- final RelDataType rowType;
- final RelProtoDataType protoRowType;
- final ImmutableSet<String> metricFieldNames;
- final ImmutableList<Interval> intervals;
- final String timestampFieldName;
-
- /**
- * Creates a Druid table.
- *
- * @param schema Druid schema that contains this table
- * @param dataSource Druid data source name
- * @param protoRowType Field names and types
- * @param metricFieldNames Names of fields that are metrics
- * @param interval Default interval if query does not constrain the time
- * @param timestampFieldName Name of the column that contains the time
- */
- public DruidTable(DruidSchema schema, String dataSource,
- RelProtoDataType protoRowType, Set<String> metricFieldNames,
- List<Interval> intervals, String timestampFieldName) {
- this.schema = Preconditions.checkNotNull(schema);
- this.dataSource = Preconditions.checkNotNull(dataSource);
- this.rowType = null;
- this.protoRowType = protoRowType;
- this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
- this.intervals = ImmutableList.copyOf(intervals);
- this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
- }
-
- public DruidTable(DruidSchema schema, String dataSource,
- RelDataType rowType, Set<String> metricFieldNames,
- List<Interval> intervals, String timestampFieldName) {
- this.schema = Preconditions.checkNotNull(schema);
- this.dataSource = Preconditions.checkNotNull(dataSource);
- this.rowType = Preconditions.checkNotNull(rowType);
- this.protoRowType = null;
- this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
- this.intervals = ImmutableList.copyOf(intervals);
- this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
- }
-
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- final RelDataType thisRowType;
- if (rowType != null) {
- thisRowType = rowType;
- } else {
- // Generate
- thisRowType = protoRowType.apply(typeFactory);
- }
- final List<String> fieldNames = thisRowType.getFieldNames();
- Preconditions.checkArgument(fieldNames.contains(timestampFieldName));
- Preconditions.checkArgument(fieldNames.containsAll(metricFieldNames));
- return thisRowType;
- }
-
- public RelNode toRel(RelOptTable.ToRelContext context,
- RelOptTable relOptTable) {
- final RelOptCluster cluster = context.getCluster();
- final TableScan scan = LogicalTableScan.create(cluster, relOptTable);
- return DruidQuery.create(cluster,
- cluster.traitSetOf(BindableConvention.INSTANCE), relOptTable, this,
- ImmutableList.<RelNode>of(scan));
- }
-
-}
-
-// End DruidTable.java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
deleted file mode 100644
index 0686dff..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
-
-public class HiveDruidConf {
-
- private int selectThreshold;
-
-
- public HiveDruidConf(int selectThreshold) {
- this.selectThreshold = selectThreshold;
- }
-
- public int getSelectThreshold() {
- return selectThreshold;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java
index dc6b152..6df6026 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java
@@ -35,7 +35,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.IntList;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
import com.google.common.collect.Sets;
@@ -90,7 +89,7 @@ public class HiveAggregate extends Aggregate implements HiveRelNode {
final RelDataType inputRowType, boolean indicator,
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
final List<AggregateCall> aggCalls) {
- final IntList groupList = groupSet.toList();
+ final List<Integer> groupList = groupSet.asList();
assert groupList.size() == groupSet.cardinality();
final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
final List<RelDataTypeField> fieldList = inputRowType.getFieldList();
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
deleted file mode 100644
index b3f8d9b..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
-
-import java.util.Set;
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-
-import com.google.common.collect.Sets;
-
-public class HiveDateGranularity extends SqlFunction {
-
- public static final SqlFunction YEAR = new HiveDateGranularity("YEAR");
- public static final SqlFunction QUARTER = new HiveDateGranularity("QUARTER");
- public static final SqlFunction MONTH = new HiveDateGranularity("MONTH");
- public static final SqlFunction WEEK = new HiveDateGranularity("WEEK");
- public static final SqlFunction DAY = new HiveDateGranularity("DAY");
- public static final SqlFunction HOUR = new HiveDateGranularity("HOUR");
- public static final SqlFunction MINUTE = new HiveDateGranularity("MINUTE");
- public static final SqlFunction SECOND = new HiveDateGranularity("SECOND");
-
- public static final Set<SqlFunction> ALL_FUNCTIONS =
- Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND);
-
- private HiveDateGranularity(String name) {
- super(
- name,
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.TIME_NULLABLE,
- null,
- OperandTypes.ANY,
- SqlFunctionCategory.TIMEDATE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java
new file mode 100644
index 0000000..4edc4df
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExtractDate.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import java.util.Set;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+
+import com.google.common.collect.Sets;
+
+public class HiveExtractDate extends SqlFunction {
+
+ public static final SqlFunction YEAR = new HiveExtractDate("YEAR");
+ public static final SqlFunction QUARTER = new HiveExtractDate("QUARTER");
+ public static final SqlFunction MONTH = new HiveExtractDate("MONTH");
+ public static final SqlFunction WEEK = new HiveExtractDate("WEEK");
+ public static final SqlFunction DAY = new HiveExtractDate("DAY");
+ public static final SqlFunction HOUR = new HiveExtractDate("HOUR");
+ public static final SqlFunction MINUTE = new HiveExtractDate("MINUTE");
+ public static final SqlFunction SECOND = new HiveExtractDate("SECOND");
+
+ public static final Set<SqlFunction> ALL_FUNCTIONS =
+ Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND);
+
+ private HiveExtractDate(String name) {
+ super(name, SqlKind.EXTRACT, ReturnTypes.INTEGER_NULLABLE, null,
+ OperandTypes.INTERVALINTERVAL_INTERVALDATETIME,
+ SqlFunctionCategory.SYSTEM);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java
new file mode 100644
index 0000000..3d104ef
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFloorDate.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import java.util.Set;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.fun.SqlMonotonicUnaryFunction;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+import com.google.common.collect.Sets;
+
+public class HiveFloorDate extends SqlMonotonicUnaryFunction {
+
+ public static final SqlFunction YEAR = new HiveFloorDate("FLOOR_YEAR");
+ public static final SqlFunction QUARTER = new HiveFloorDate("FLOOR_QUARTER");
+ public static final SqlFunction MONTH = new HiveFloorDate("FLOOR_MONTH");
+ public static final SqlFunction WEEK = new HiveFloorDate("FLOOR_WEEK");
+ public static final SqlFunction DAY = new HiveFloorDate("FLOOR_DAY");
+ public static final SqlFunction HOUR = new HiveFloorDate("FLOOR_HOUR");
+ public static final SqlFunction MINUTE = new HiveFloorDate("FLOOR_MINUTE");
+ public static final SqlFunction SECOND = new HiveFloorDate("FLOOR_SECOND");
+
+ public static final Set<SqlFunction> ALL_FUNCTIONS =
+ Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND);
+
+ private HiveFloorDate(String name) {
+ super(name, SqlKind.FLOOR, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null,
+ OperandTypes.sequence(
+ "'" + SqlKind.FLOOR + "(<DATE> TO <TIME_UNIT>)'\n"
+ + "'" + SqlKind.FLOOR + "(<TIME> TO <TIME_UNIT>)'\n"
+ + "'" + SqlKind.FLOOR + "(<TIMESTAMP> TO <TIME_UNIT>)'",
+ OperandTypes.DATETIME,
+ OperandTypes.ANY),
+ SqlFunctionCategory.NUMERIC);
+ }
+
+ @Override
+ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
+ // Monotonic iff its first argument is, but not strict.
+ return call.getOperandMonotonicity(0).unstrict();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java
index e9a4d88..87e755c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateJoinTransposeRule.java
@@ -133,9 +133,10 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule {
// Split join condition
final List<Integer> leftKeys = Lists.newArrayList();
final List<Integer> rightKeys = Lists.newArrayList();
+ final List<Boolean> filterNulls = Lists.newArrayList();
RexNode nonEquiConj =
RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(),
- join.getCondition(), leftKeys, rightKeys);
+ join.getCondition(), leftKeys, rightKeys, filterNulls);
// If it contains non-equi join conditions, we bail out
if (!nonEquiConj.isAlwaysTrue()) {
return;
@@ -271,7 +272,8 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule {
RelOptUtil.areRowTypesEqual(r.getRowType(), aggregate.getRowType(), false)) {
// no need to aggregate
} else {
- r = RelOptUtil.createProject(r, projects, null, true, projectFactory);
+ r = RelOptUtil.createProject(r, projects, null, true,
+ relBuilderFactory.create(aggregate.getCluster(), null));
if (allColumnsInAggregate) {
// let's see if we can convert
List<RexNode> projects2 = new ArrayList<>();
@@ -290,7 +292,8 @@ public class HiveAggregateJoinTransposeRule extends AggregateJoinTransposeRule {
if (projects2.size()
== aggregate.getGroupSet().cardinality() + newAggCalls.size()) {
// We successfully converted agg calls into projects.
- r = RelOptUtil.createProject(r, projects2, null, true, projectFactory);
+ r = RelOptUtil.createProject(r, projects2, null, true,
+ relBuilderFactory.create(aggregate.getCluster(), null));
break b;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b597ab2a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java
index 8af8a0d..c243266 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAggregateProjectMergeRule.java
@@ -141,7 +141,8 @@ public class HiveAggregateProjectMergeRule extends RelOptRule {
i < newAggregate.getRowType().getFieldCount(); i++) {
posList.add(i);
}
- rel = HiveRelOptUtil.createProject(HiveRelFactories.HIVE_PROJECT_FACTORY,
+ rel = HiveRelOptUtil.createProject(
+ HiveRelFactories.HIVE_BUILDER.create(aggregate.getCluster(), null),
rel, posList);
}