You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2015/05/20 18:01:53 UTC

[34/50] [abbrv] hive git commit: HIVE-10627: CBO - Queries fail with Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez via Laljo John Pullokkaran)

HIVE-10627: CBO - Queries fail with Failed to breakup Windowing invocations into Groups (Jesus Camacho Rodriguez via Laljo John Pullokkaran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4923cee
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4923cee
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4923cee

Branch: refs/heads/parquet
Commit: f4923ceeb3ea8ec544ae55236b65620307ce233b
Parents: c640a38
Author: jpullokk <jp...@apache.org>
Authored: Mon May 18 12:34:14 2015 -0700
Committer: jpullokk <jp...@apache.org>
Committed: Mon May 18 12:34:14 2015 -0700

----------------------------------------------------------------------
 .../calcite/rules/HiveWindowingFixRule.java     | 163 +++++++++++++++++++
 .../calcite/translator/ASTConverter.java        |   2 -
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  11 ++
 3 files changed, 174 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
new file mode 100644
index 0000000..ff203d3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+
+/**
+ * Rule to fix windowing issue when it is done over
+ * aggregation columns (more info in HIVE-10627).
+ *
+ * This rule is applied as a post-processing step after
+ * optimization by Calcite in order to add columns
+ * that may be pruned by RelFieldTrimmer, but are
+ * still needed due to the concrete implementation of
+ * Windowing processing in Hive.
+ */
+public class HiveWindowingFixRule extends RelOptRule {
+
+  public static final HiveWindowingFixRule INSTANCE = new HiveWindowingFixRule();
+
+  private final ProjectFactory projectFactory;
+
+
+  private HiveWindowingFixRule() {
+    super(
+        operand(Project.class,
+            operand(Aggregate.class, any())));
+    this.projectFactory = HiveProject.DEFAULT_PROJECT_FACTORY;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Project project = call.rel(0);
+    Aggregate aggregate = call.rel(1);
+
+    // 1. We go over the expressions in the project operator
+    //    and we separate the windowing nodes that are result
+    //    of an aggregate expression from the rest of nodes
+    final int groupingFields = aggregate.getGroupCount() + aggregate.getIndicatorCount();
+    Set<String> projectExprsDigest = new HashSet<String>();
+    Map<String, RexNode> windowingExprsDigestToNodes = new HashMap<String,RexNode>();
+    for (RexNode r : project.getChildExps()) {
+      if (r instanceof RexOver) {
+        RexOver rexOverNode = (RexOver) r;
+        // Operands
+        for (RexNode operand : rexOverNode.getOperands()) {
+          if (operand instanceof RexInputRef &&
+                  ((RexInputRef)operand).getIndex() >= groupingFields) {
+            windowingExprsDigestToNodes.put(operand.toString(), operand);
+          }
+        }
+        // Partition keys
+        for (RexNode partitionKey : rexOverNode.getWindow().partitionKeys) {
+          if (partitionKey instanceof RexInputRef &&
+                  ((RexInputRef)partitionKey).getIndex() >= groupingFields) {
+            windowingExprsDigestToNodes.put(partitionKey.toString(), partitionKey);
+          }
+        }
+        // Order keys
+        for (RexFieldCollation orderKey : rexOverNode.getWindow().orderKeys) {
+          if (orderKey.left instanceof RexInputRef &&
+                  ((RexInputRef)orderKey.left).getIndex() >= groupingFields) {
+            windowingExprsDigestToNodes.put(orderKey.left.toString(), orderKey.left);
+          }
+        }
+      } else {
+        projectExprsDigest.add(r.toString());
+      }
+    }
+
+    // 2. We check whether there is a column needed by the
+    //    windowing operation that is missing in the
+    //    project expressions. For instance, if the windowing
+    //    operation is over an aggregation column, Hive expects
+    //    that column to be in the Select clause of the query.
+    //    The idea is that if there is a column missing, we will
+    //    replace the old project operator by two new project
+    //    operators:
+    //    - a project operator containing the original columns
+    //      of the project operator plus all the columns that were
+    //      missing
+    //    - a project on top of the previous one, that will take
+    //      out the columns that were missing and were added by the
+    //      previous project
+
+    // These data structures are needed to create the new project
+    // operator (below)
+    final List<RexNode> belowProjectExprs = new ArrayList<RexNode>();
+    final List<String> belowProjectColumnNames = new ArrayList<String>();
+
+    // This data structure is needed to create the new project
+    // operator (top)
+    final List<RexNode> topProjectExprs = new ArrayList<RexNode>();
+
+    final int projectCount = project.getChildExps().size();
+    for (int i = 0; i < projectCount; i++) {
+      belowProjectExprs.add(project.getChildExps().get(i));
+      belowProjectColumnNames.add(project.getRowType().getFieldNames().get(i));
+      topProjectExprs.add(RexInputRef.of(i, project.getRowType()));
+    }
+    boolean windowingFix = false;
+    for (Entry<String, RexNode> windowingExpr : windowingExprsDigestToNodes.entrySet()) {
+      if (!projectExprsDigest.contains(windowingExpr.getKey())) {
+        windowingFix = true;
+        belowProjectExprs.add(windowingExpr.getValue());
+        int colIndex = 0;
+        String alias = "window_col_" + colIndex;
+        while (belowProjectColumnNames.contains(alias)) {
+          alias = "window_col_" + (colIndex++);
+        }
+        belowProjectColumnNames.add(alias);
+      }
+    }
+
+    if (!windowingFix) {
+      // We do not need to do anything, we bail out
+      return;
+    }
+
+    // 3. We need to fix it, we create the two replacement project
+    //    operators
+    RelNode newProjectRel = projectFactory.createProject(
+        aggregate, belowProjectExprs, belowProjectColumnNames);
+    RelNode newTopProjectRel = projectFactory.createProject(
+        newProjectRel, topProjectExprs, project.getRowType().getFieldNames());
+
+    call.transformTo(newTopProjectRel);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
index 0ada068..95f43d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
@@ -54,10 +54,8 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
-import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;

http://git-wip-us.apache.org/repos/asf/hive/blob/f4923cee/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 6e6923c..c412561 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4Join
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory;
@@ -855,6 +856,16 @@ public class CalcitePlanner extends SemanticAnalyzer {
 
       calciteOptimizedPlan = hepPlanner.findBestExp();
 
+      // run rule to fix windowing issue when it is done over
+      // aggregation columns (HIVE-10627)
+      hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);
+      hepPgmBldr.addRuleInstance(HiveWindowingFixRule.INSTANCE);
+      hepPlanner = new HepPlanner(hepPgmBldr.build());
+      hepPlanner.registerMetadataProviders(list);
+      cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner));
+      hepPlanner.setRoot(calciteOptimizedPlan);
+      calciteOptimizedPlan = hepPlanner.findBestExp();
+
       if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
         // run rules to aid in translation from Optiq tree -> Hive tree
         hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);