You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2015/05/20 18:01:53 UTC
[34/50] [abbrv] hive git commit: HIVE-10627: CBO - Queries fail with
Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez
via Laljo John Pullokkaran)
HIVE-10627: CBO - Queries fail with Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez via Laljo John Pullokkaran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4923cee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4923cee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4923cee
Branch: refs/heads/parquet
Commit: f4923ceeb3ea8ec544ae55236b65620307ce233b
Parents: c640a38
Author: jpullokk <jp...@apache.org>
Authored: Mon May 18 12:34:14 2015 -0700
Committer: jpullokk <jp...@apache.org>
Committed: Mon May 18 12:34:14 2015 -0700
----------------------------------------------------------------------
.../calcite/rules/HiveWindowingFixRule.java | 163 +++++++++++++++++++
.../calcite/translator/ASTConverter.java | 2 -
.../hadoop/hive/ql/parse/CalcitePlanner.java | 11 ++
3 files changed, 174 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
new file mode 100644
index 0000000..ff203d3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+
+/**
+ * Rule to fix windowing issue when it is done over
+ * aggregation columns (more info in HIVE-10627).
+ *
+ * This rule is applied as a post-processing step after
+ * optimization by Calcite in order to add columns
+ * that may be pruned by RelFieldTrimmer, but are
+ * still needed due to the concrete implementation of
+ * Windowing processing in Hive.
+ */
+public class HiveWindowingFixRule extends RelOptRule {
+
+ public static final HiveWindowingFixRule INSTANCE = new HiveWindowingFixRule();
+
+ private final ProjectFactory projectFactory;
+
+
+ private HiveWindowingFixRule() {
+ super(
+ operand(Project.class,
+ operand(Aggregate.class, any())));
+ this.projectFactory = HiveProject.DEFAULT_PROJECT_FACTORY;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Project project = call.rel(0);
+ Aggregate aggregate = call.rel(1);
+
+ // 1. We go over the expressions in the project operator
+ // and we separate the windowing nodes that are result
+ // of an aggregate expression from the rest of nodes
+ final int groupingFields = aggregate.getGroupCount() + aggregate.getIndicatorCount();
+ Set<String> projectExprsDigest = new HashSet<String>();
+ Map<String, RexNode> windowingExprsDigestToNodes = new HashMap<String,RexNode>();
+ for (RexNode r : project.getChildExps()) {
+ if (r instanceof RexOver) {
+ RexOver rexOverNode = (RexOver) r;
+ // Operands
+ for (RexNode operand : rexOverNode.getOperands()) {
+ if (operand instanceof RexInputRef &&
+ ((RexInputRef)operand).getIndex() >= groupingFields) {
+ windowingExprsDigestToNodes.put(operand.toString(), operand);
+ }
+ }
+ // Partition keys
+ for (RexNode partitionKey : rexOverNode.getWindow().partitionKeys) {
+ if (partitionKey instanceof RexInputRef &&
+ ((RexInputRef)partitionKey).getIndex() >= groupingFields) {
+ windowingExprsDigestToNodes.put(partitionKey.toString(), partitionKey);
+ }
+ }
+ // Order keys
+ for (RexFieldCollation orderKey : rexOverNode.getWindow().orderKeys) {
+ if (orderKey.left instanceof RexInputRef &&
+ ((RexInputRef)orderKey.left).getIndex() >= groupingFields) {
+ windowingExprsDigestToNodes.put(orderKey.left.toString(), orderKey.left);
+ }
+ }
+ } else {
+ projectExprsDigest.add(r.toString());
+ }
+ }
+
+ // 2. We check whether there is a column needed by the
+ // windowing operation that is missing in the
+ // project expressions. For instance, if the windowing
+ // operation is over an aggregation column, Hive expects
+ // that column to be in the Select clause of the query.
+ // The idea is that if there is a column missing, we will
+ // replace the old project operator by two new project
+ // operators:
+ // - a project operator containing the original columns
+ // of the project operator plus all the columns that were
+ // missing
+ // - a project on top of the previous one, that will take
+ // out the columns that were missing and were added by the
+ // previous project
+
+ // These data structures are needed to create the new project
+ // operator (below)
+ final List<RexNode> belowProjectExprs = new ArrayList<RexNode>();
+ final List<String> belowProjectColumnNames = new ArrayList<String>();
+
+ // This data structure is needed to create the new project
+ // operator (top)
+ final List<RexNode> topProjectExprs = new ArrayList<RexNode>();
+
+ final int projectCount = project.getChildExps().size();
+ for (int i = 0; i < projectCount; i++) {
+ belowProjectExprs.add(project.getChildExps().get(i));
+ belowProjectColumnNames.add(project.getRowType().getFieldNames().get(i));
+ topProjectExprs.add(RexInputRef.of(i, project.getRowType()));
+ }
+ boolean windowingFix = false;
+ for (Entry<String, RexNode> windowingExpr : windowingExprsDigestToNodes.entrySet()) {
+ if (!projectExprsDigest.contains(windowingExpr.getKey())) {
+ windowingFix = true;
+ belowProjectExprs.add(windowingExpr.getValue());
+ int colIndex = 0;
+ String alias = "window_col_" + colIndex;
+ while (belowProjectColumnNames.contains(alias)) {
+ alias = "window_col_" + (colIndex++);
+ }
+ belowProjectColumnNames.add(alias);
+ }
+ }
+
+ if (!windowingFix) {
+ // We do not need to do anything, we bail out
+ return;
+ }
+
+ // 3. We need to fix it, we create the two replacement project
+ // operators
+ RelNode newProjectRel = projectFactory.createProject(
+ aggregate, belowProjectExprs, belowProjectColumnNames);
+ RelNode newTopProjectRel = projectFactory.createProject(
+ newProjectRel, topProjectExprs, project.getRowType().getFieldNames());
+
+ call.transformTo(newTopProjectRel);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
index 0ada068..95f43d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
@@ -54,10 +54,8 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
-import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 6e6923c..c412561 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4Join
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory;
@@ -855,6 +856,16 @@ public class CalcitePlanner extends SemanticAnalyzer {
calciteOptimizedPlan = hepPlanner.findBestExp();
+ // run rule to fix windowing issue when it is done over
+ // aggregation columns (HIVE-10627)
+ hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);
+ hepPgmBldr.addRuleInstance(HiveWindowingFixRule.INSTANCE);
+ hepPlanner = new HepPlanner(hepPgmBldr.build());
+ hepPlanner.registerMetadataProviders(list);
+ cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner));
+ hepPlanner.setRoot(calciteOptimizedPlan);
+ calciteOptimizedPlan = hepPlanner.findBestExp();
+
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
// run rules to aid in translation from Optiq tree -> Hive tree
hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);