You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/18 11:16:53 UTC

svn commit: r1504395 [2/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/if/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * Implementation of  Correlation Optimizer. This optimizer is based on
+ * the paper "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
+ * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf).
+ * Correlation Optimizer detects if ReduceSinkOperators share same keys.
+ * Then, it will transform the query plan tree (operator tree) by exploiting
+ * detected correlations. For details, see the original paper of YSmart.
+ *
+ * Test queries associated with this optimizer are correlationoptimizer1.q to
+ * correlationoptimizer14.q
+ */
+public class CorrelationOptimizer implements Transform {
+
+  private static final Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+
+  private boolean abort; // if correlation optimizer will not try to optimize this query
+
+  private ParseContext pCtx;
+
+  //Join operators which may be converted by CommonJoinResolver;
+  private final Set<Operator<? extends OperatorDesc>> skipedJoinOperators;
+
+  public CorrelationOptimizer() {
+    super();
+    pCtx = null;
+    skipedJoinOperators = new HashSet<Operator<? extends OperatorDesc>>();
+    abort = false;
+  }
+
+  private void findPossibleAutoConvertedJoinOperators() throws SemanticException {
+    // Guess if CommonJoinResolver will work. If CommonJoinResolver may
+    // convert a join operation, correlation optimizer will not merge that join.
+    // TODO: If hive.auto.convert.join.noconditionaltask=true, for a JoinOperator
+    // that has both intermediate tables and query input tables as input tables,
+    // we should be able to guess if this JoinOperator will be converted to a MapJoin
+    // based on hive.auto.convert.join.noconditionaltask.size.
+    for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) {
+      boolean isAbleToGuess = true;
+      boolean mayConvert = false;
+      // Get total size and individual alias's size
+      long aliasTotalKnownInputSize = 0;
+      Map<String, Long> aliasToSize = new HashMap<String, Long>();
+      Map<Integer, String> posToAlias = new HashMap<Integer, String>();
+      for (Operator<? extends OperatorDesc> op: joinOp.getParentOperators()) {
+        TableScanOperator tsop = CorrelationUtilities.findTableScanOperator(op);
+        if (tsop == null) {
+          isAbleToGuess = false;
+          break;
+        }
+
+        Table table = pCtx.getTopToTable().get(tsop);
+        String alias = tsop.getConf().getAlias();
+        posToAlias.put(joinOp.getParentOperators().indexOf(op), alias);
+        if (table == null) {
+          // table should not be null.
+          throw new SemanticException("The table of " +
+              tsop.getName() + " " + tsop.getIdentifier() +
+              " is null, which is not expected.");
+        }
+
+        Path p = table.getPath();
+        FileSystem fs = null;
+        ContentSummary resultCs = null;
+        try {
+          fs = table.getPath().getFileSystem(pCtx.getConf());
+          resultCs = fs.getContentSummary(p);
+        } catch (IOException e) {
+          LOG.warn("Encounter a error while querying content summary of table " +
+              table.getCompleteName() + " from FileSystem. " +
+              "Cannot guess if CommonJoinOperator will optimize " +
+              joinOp.getName() + " " + joinOp.getIdentifier());
+        }
+        if (resultCs == null) {
+          isAbleToGuess = false;
+          break;
+        }
+
+        long size = resultCs.getLength();
+        aliasTotalKnownInputSize += size;
+        Long es = aliasToSize.get(alias);
+        if(es == null) {
+          es = new Long(0);
+        }
+        es += size;
+        aliasToSize.put(alias, es);
+      }
+
+      if (!isAbleToGuess) {
+        LOG.info("Cannot guess if CommonJoinOperator will optimize " +
+            joinOp.getName() + " " + joinOp.getIdentifier());
+        continue;
+      }
+
+      JoinDesc joinDesc = joinOp.getConf();
+      Byte[] order = joinDesc.getTagOrder();
+      int numAliases = order.length;
+      HashSet<Integer> bigTableCandidates =
+          MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
+      if (bigTableCandidates == null) {
+        continue;
+      }
+
+      String bigTableAlias = null;
+      long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(pCtx.getConf(),
+          HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
+      for (int i = 0; i < numAliases; i++) {
+        // this table cannot be big table
+        if (!bigTableCandidates.contains(i)) {
+          continue;
+        }
+        bigTableAlias = posToAlias.get(i);
+        if (!CommonJoinTaskDispatcher.cannotConvert(bigTableAlias, aliasToSize,
+            aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+          mayConvert = true;
+        }
+      }
+
+      if (mayConvert) {
+        LOG.info(joinOp.getName() + " " + joinOp.getIdentifier() +
+            " may be converted to MapJoin by CommonJoinResolver");
+        skipedJoinOperators.add(joinOp);
+      }
+    }
+  }
+
+  /**
+   * Detect correlations and transform the query tree.
+   *
+   * @param pactx
+   *          current parse context
+   * @throws SemanticException
+   */
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    pCtx = pctx;
+
+    if (HiveConf.getBoolVar(pCtx.getConf(),HiveConf.ConfVars.HIVECONVERTJOIN)) {
+      findPossibleAutoConvertedJoinOperators();
+    }
+
+    // detect correlations
+    CorrelationNodeProcCtx corrCtx = new CorrelationNodeProcCtx(pCtx);
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+        new CorrelationNodeProc());
+
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, corrCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topOp nodes
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    // We have finished tree walking (correlation detection).
+    // We will first see if we need to abort (the operator tree has not been changed).
+    // If not, we will start to transform the operator tree.
+    abort = corrCtx.isAbort();
+    if (abort) {
+      LOG.info("Abort. Reasons are ...");
+      for (String reason : corrCtx.getAbortReasons()) {
+        LOG.info("-- " + reason);
+      }
+    } else {
+      // transform the operator tree
+      LOG.info("Begain query plan transformation based on intra-query correlations. " +
+          corrCtx.getCorrelations().size() + " correlation(s) to be applied");
+      for (IntraQueryCorrelation correlation : corrCtx.getCorrelations()) {
+        QueryPlanTreeTransformation.applyCorrelation(pCtx, corrCtx, correlation);
+      }
+    }
+    return pCtx;
+  }
+
+  private class CorrelationNodeProc implements NodeProcessor {
+
+    private void analyzeReduceSinkOperatorsOfJoinOperator(JoinCondDesc[] joinConds,
+        List<Operator<? extends OperatorDesc>> rsOps, Operator<? extends OperatorDesc> curentRsOp,
+        Set<ReduceSinkOperator> correlatedRsOps) {
+      if (correlatedRsOps.contains((ReduceSinkOperator) curentRsOp)) {
+        return;
+      }
+      correlatedRsOps.add((ReduceSinkOperator) curentRsOp);
+
+      int pos = rsOps.indexOf(curentRsOp);
+      for (int i = 0; i < joinConds.length; i++) {
+        JoinCondDesc joinCond = joinConds[i];
+        int type = joinCond.getType();
+        if (pos == joinCond.getLeft()) {
+          if (type == JoinDesc.INNER_JOIN ||
+              type == JoinDesc.LEFT_OUTER_JOIN ||
+              type == JoinDesc.LEFT_SEMI_JOIN) {
+            Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getRight());
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+                correlatedRsOps);
+          }
+        } else if (pos == joinCond.getRight()) {
+          if (type == JoinDesc.INNER_JOIN || type == JoinDesc.RIGHT_OUTER_JOIN) {
+            Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getLeft());
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+                correlatedRsOps);
+          }
+        }
+      }
+    }
+
+    private boolean sameKeys(List<ExprNodeDesc> k1, List<ExprNodeDesc> k2) {
+      if (k1.size() != k2.size()) {
+        return false;
+      }
+      for (int i = 0; i < k1.size(); i++) {
+        ExprNodeDesc expr1 = k1.get(i);
+        ExprNodeDesc expr2 = k2.get(i);
+        if (expr1 == null) {
+          if (expr2 == null) {
+            continue;
+          } else {
+            return false;
+          }
+        } else {
+          if (!expr1.isSame(expr2)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private boolean sameOrder(String order1, String order2) {
+      if (order1 == null || order1.trim().equals("")) {
+        if (order2 == null || order2.trim().equals("")) {
+          return true;
+        }
+        return false;
+      }
+      if (order2 == null || order2.trim().equals("")) {
+        return false;
+      }
+      order1 = order1.trim();
+      order2 = order2.trim();
+      if (!order1.equals(order2)) {
+        return false;
+      }
+      return true;
+    }
+    /**
+     * This method is used to recursively traverse the tree to find
+     * ReduceSinkOperators which share the same key columns and partitioning
+     * columns. Those ReduceSinkOperators are called correlated ReduceSinkOperaotrs.
+     *
+     * @param child The child of the current operator
+     * @param childKeyCols The key columns from the child operator
+     * @param childPartitionCols The partitioning columns from the child operator
+     * @param childRSOrder The sorting order of key columns from the child operator
+     * @param current The current operator we are visiting
+     * @param correlation The object keeps tracking the correlation
+     * @return
+     * @throws SemanticException
+     */
+    private LinkedHashSet<ReduceSinkOperator> findCorrelatedReduceSinkOperators(
+        Operator<? extends OperatorDesc> child,
+        List<ExprNodeDesc> childKeyCols, List<ExprNodeDesc> childPartitionCols,
+        String childRSOrder,
+        Operator<? extends OperatorDesc> current,
+        IntraQueryCorrelation correlation) throws SemanticException {
+
+      LOG.info("now detecting operator " + current.getIdentifier() + " " + current.getName());
+
+      LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
+          new LinkedHashSet<ReduceSinkOperator>();
+      if (skipedJoinOperators.contains(current)) {
+        LOG.info(current.getName() + " " + current.getIdentifier() +
+            " may be converted to MapJoin by " +
+            "CommonJoinResolver. Correlation optimizer will not detect correlations" +
+            "involved in this operator");
+        return correlatedReduceSinkOperators;
+      }
+      if (current.getParentOperators() == null) {
+        return correlatedReduceSinkOperators;
+      }
+      if (current instanceof PTFOperator) {
+        // Currently, we do not support PTF operator.
+        LOG.info("Currently, correlation optimizer does not support PTF operator.");
+        return correlatedReduceSinkOperators;
+      }
+      if (current instanceof UnionOperator) {
+        // If we get a UnionOperator, right now, we only handle it when
+        // we can find correlated ReduceSinkOperators from all inputs.
+        LinkedHashSet<ReduceSinkOperator> corrRSs = new LinkedHashSet<ReduceSinkOperator>();
+        for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+          LinkedHashSet<ReduceSinkOperator> tmp =
+              findCorrelatedReduceSinkOperators(
+                  current, childKeyCols, childPartitionCols, childRSOrder, parent, correlation);
+          if (tmp != null && tmp.size() > 0) {
+            corrRSs.addAll(tmp);
+          } else {
+            return correlatedReduceSinkOperators;
+          }
+        }
+        correlatedReduceSinkOperators.addAll(corrRSs);
+        UnionOperator union = (UnionOperator)current;
+        union.getConf().setAllInputsInSameReducer(true);
+      } else if (current.getColumnExprMap() == null && !(current instanceof ReduceSinkOperator)) {
+        for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+          correlatedReduceSinkOperators.addAll(
+              findCorrelatedReduceSinkOperators(
+                  current, childKeyCols, childPartitionCols, childRSOrder, parent, correlation));
+        }
+      } else if (current.getColumnExprMap() != null && !(current instanceof ReduceSinkOperator)) {
+        List<ExprNodeDesc> backtrackedKeyCols =
+            ExprNodeDescUtils.backtrack(childKeyCols, child, current);
+        List<ExprNodeDesc> backtrackedPartitionCols =
+            ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+        Set<String> tableNeedToCheck = new HashSet<String>();
+        for (ExprNodeDesc expr: childKeyCols) {
+          if (!(expr instanceof ExprNodeColumnDesc)) {
+            return correlatedReduceSinkOperators;
+          } else {
+            String colName = ((ExprNodeColumnDesc)expr).getColumn();
+            OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
+            for (ColumnInfo cinfo : opCtx.getRowResolver().getColumnInfos()) {
+              if (colName.equals(cinfo.getInternalName())) {
+                tableNeedToCheck.add(cinfo.getTabAlias());
+              }
+            }
+          }
+        }
+        if (current instanceof JoinOperator) {
+          LinkedHashSet<ReduceSinkOperator> correlatedRsOps =
+              new LinkedHashSet<ReduceSinkOperator>();
+          for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+            Set<String> tableNames =
+                pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames();
+            for (String tbl : tableNames) {
+              if (tableNeedToCheck.contains(tbl)) {
+                correlatedRsOps.addAll(findCorrelatedReduceSinkOperators(
+                    current, backtrackedKeyCols, backtrackedPartitionCols, childRSOrder,
+                    parent, correlation));
+              }
+            }
+          }
+          // If current is JoinOperaotr, we will stop to traverse the tree
+          // when any of parent ReduceSinkOperaotr of this JoinOperator is
+          // not considered as a correlated ReduceSinkOperator.
+          if (correlatedRsOps.size() == current.getParentOperators().size()) {
+            correlatedReduceSinkOperators.addAll(correlatedRsOps);
+          } else {
+            correlatedReduceSinkOperators.clear();
+          }
+        } else {
+          for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+            correlatedReduceSinkOperators.addAll(findCorrelatedReduceSinkOperators(
+                current, backtrackedKeyCols, backtrackedPartitionCols, childRSOrder,
+                parent, correlation));
+          }
+        }
+      } else if (current.getColumnExprMap() != null && current instanceof ReduceSinkOperator) {
+        ReduceSinkOperator rsop = (ReduceSinkOperator) current;
+        List<ExprNodeDesc> backtrackedKeyCols =
+            ExprNodeDescUtils.backtrack(childKeyCols, child, current);
+        List<ExprNodeDesc> backtrackedPartitionCols =
+            ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+        List<ExprNodeDesc> rsKeyCols = rsop.getConf().getKeyCols();
+        List<ExprNodeDesc> rsPartitionCols = rsop.getConf().getPartitionCols();
+
+        // Two ReduceSinkOperators are correlated means that
+        // they have same sorting columns (key columns), same partitioning columns,
+        // same sorting orders, and no conflict on the numbers of reducers.
+        // TODO: we should relax this condition
+        // TODO: we need to handle aggregation functions with distinct keyword. In this case,
+        // distinct columns will be added to the key columns.
+        boolean isCorrelated = sameKeys(rsKeyCols, backtrackedKeyCols) &&
+            sameOrder(rsop.getConf().getOrder(), childRSOrder) &&
+            sameKeys(backtrackedPartitionCols, rsPartitionCols) &&
+            correlation.adjustNumReducers(rsop.getConf().getNumReducers());
+        GroupByOperator cGBY =
+            CorrelationUtilities.getSingleChild(rsop, GroupByOperator.class);
+        if (cGBY != null) {
+          if (CorrelationUtilities.hasGroupingSet(rsop) ||
+              cGBY.getConf().isGroupingSetsPresent()) {
+            // Do not support grouping set right now
+            isCorrelated = false;
+          }
+        }
+
+        if (isCorrelated) {
+          LOG.info("Operator " + current.getIdentifier() + " " +
+              current.getName() + " is correlated");
+          Operator<? extends OperatorDesc> childOperator =
+              CorrelationUtilities.getSingleChild(current, true);
+          if (childOperator instanceof JoinOperator) {
+            JoinOperator joinOp = (JoinOperator) childOperator;
+            JoinCondDesc[] joinConds = joinOp.getConf().getConds();
+            List<Operator<? extends OperatorDesc>> rsOps = joinOp.getParentOperators();
+            LinkedHashSet<ReduceSinkOperator> correlatedRsOps =
+                new LinkedHashSet<ReduceSinkOperator>();
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, current, correlatedRsOps);
+            correlatedReduceSinkOperators.addAll(correlatedRsOps);
+          } else {
+            correlatedReduceSinkOperators.add(rsop);
+          }
+        } else {
+          LOG.info("Operator " + current.getIdentifier() + " " +
+              current.getName() + " is not correlated");
+          correlatedReduceSinkOperators.clear();
+        }
+      } else {
+        LOG.error("ReduceSinkOperator " + current.getIdentifier() + " does not have ColumnExprMap");
+        throw new SemanticException("CorrelationOptimizer cannot optimize this plan. " +
+            "ReduceSinkOperator " + current.getIdentifier()
+            + " does not have ColumnExprMap");
+      }
+      return correlatedReduceSinkOperators;
+    }
+
+    /** Start to exploit Job Flow Correlation from op.
+     * Example: here is the operator tree we have ...
+     *       JOIN2
+     *      /    \
+     *     RS4   RS5
+     *    /        \
+     *   GBY1     JOIN1
+     *    |       /    \
+     *   RS1     RS2   RS3
+     * The op will be RS4. If we can execute GBY1, JOIN1, and JOIN2 in
+     * the same reducer. This method will return [RS1, RS2, RS3].
+     * @param op
+     * @param correlationCtx
+     * @param correlation
+     * @return
+     * @throws SemanticException
+     */
+    private LinkedHashSet<ReduceSinkOperator> exploitJobFlowCorrelation(ReduceSinkOperator op,
+        CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation)
+        throws SemanticException {
+      correlationCtx.addWalked(op);
+      correlation.addToAllReduceSinkOperators(op);
+      boolean shouldDetect = true;
+      LinkedHashSet<ReduceSinkOperator> reduceSinkOperators =
+          new LinkedHashSet<ReduceSinkOperator>();
+      List<ExprNodeDesc> keyCols = op.getConf().getKeyCols();
+      List<ExprNodeDesc> partitionCols = op.getConf().getPartitionCols();
+      for (ExprNodeDesc key : keyCols) {
+        if (!(key instanceof ExprNodeColumnDesc)) {
+          shouldDetect = false;
+        }
+      }
+      for (ExprNodeDesc key : partitionCols) {
+        if (!(key instanceof ExprNodeColumnDesc)) {
+          shouldDetect = false;
+        }
+      }
+      GroupByOperator cGBY =
+          CorrelationUtilities.getSingleChild(op, GroupByOperator.class);
+      if (cGBY != null) {
+        if (CorrelationUtilities.hasGroupingSet(op) ||
+            cGBY.getConf().isGroupingSetsPresent()) {
+          // Do not support grouping set right now
+          shouldDetect = false;
+        }
+      }
+
+      if (shouldDetect) {
+        LinkedHashSet<ReduceSinkOperator> newReduceSinkOperators =
+            new LinkedHashSet<ReduceSinkOperator>();
+        String sortOrder = op.getConf().getOrder();
+        for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+          LOG.info("Operator " + op.getIdentifier()
+              + ": start detecting correlation from this operator");
+          LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
+              findCorrelatedReduceSinkOperators(op, keyCols, partitionCols,
+                  sortOrder, parent, correlation);
+          if (correlatedReduceSinkOperators.size() == 0) {
+            newReduceSinkOperators.add(op);
+          } else {
+            for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+              LinkedHashSet<ReduceSinkOperator> exploited =
+                  exploitJobFlowCorrelation(rsop, correlationCtx, correlation);
+              if (exploited.size() == 0) {
+                newReduceSinkOperators.add(rsop);
+              } else {
+                newReduceSinkOperators.addAll(exploited);
+              }
+            }
+          }
+        }
+        reduceSinkOperators.addAll(newReduceSinkOperators);
+      }
+      return reduceSinkOperators;
+    }
+
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      CorrelationNodeProcCtx corrCtx = (CorrelationNodeProcCtx) ctx;
+      ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+      // Check if we have visited this operator
+      if (corrCtx.isWalked(op)) {
+        return null;
+      }
+
+      LOG.info("Walk to operator " + op.getIdentifier() + " " + op.getName());
+
+      Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(op, true);
+      if (!(child instanceof JoinOperator) && !(child instanceof GroupByOperator)) {
+        corrCtx.addWalked(op);
+        return null;
+      }
+
+      // detect correlations
+      IntraQueryCorrelation correlation = new IntraQueryCorrelation(corrCtx.minReducer());
+      List<ReduceSinkOperator> topReduceSinkOperators =
+          CorrelationUtilities.findSiblingReduceSinkOperators(op);
+      List<ReduceSinkOperator> bottomReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+      // Adjust the number of reducers of this correlation based on
+      // those top layer ReduceSinkOperators.
+      for (ReduceSinkOperator rsop : topReduceSinkOperators) {
+        if (!correlation.adjustNumReducers(rsop.getConf().getNumReducers())) {
+          // If we have a conflict on the number of reducers, we will not optimize
+          // this plan from here.
+          corrCtx.addWalked(op);
+          return null;
+        }
+      }
+      for (ReduceSinkOperator rsop : topReduceSinkOperators) {
+        LinkedHashSet<ReduceSinkOperator> thisBottomReduceSinkOperators =
+            exploitJobFlowCorrelation(rsop, corrCtx, correlation);
+        if (thisBottomReduceSinkOperators.size() == 0) {
+          thisBottomReduceSinkOperators.add(rsop);
+        }
+        bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+      }
+
+      if (!topReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+        LOG.info("has job flow correlation");
+        correlation.setJobFlowCorrelation(true, bottomReduceSinkOperators);
+      }
+
+      if (correlation.hasJobFlowCorrelation()) {
+        corrCtx.addCorrelation(correlation);
+      } else {
+        // Since we cannot merge operators into a single MR job from here,
+        // we should remove ReduceSinkOperators added into walked in exploitJFC
+        corrCtx.removeWalkedAll(correlation.getAllReduceSinkOperators());
+      }
+
+      corrCtx.addWalked(op);
+      return null;
+    }
+  }
+
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+        LOG.info("Walk to operator " + op.getIdentifier() + " "
+            + op.getName() + ". No actual work to do");
+        CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+        if (op.getName().equals(MapJoinOperator.getOperatorName())) {
+          correlationCtx.setAbort(true);
+          correlationCtx.getAbortReasons().add("Found MAPJOIN");
+        }
+        if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+          correlationCtx.incrementFileSinkOperatorCount();
+        }
+        return null;
+      }
+    };
+  }
+
+  protected class CorrelationNodeProcCtx extends AbstractCorrelationProcCtx {
+
+    private boolean abort;
+    private final List<String> abortReasons;
+
+    private final Set<ReduceSinkOperator> walked;
+
+    private final List<IntraQueryCorrelation> correlations;
+
+    private int fileSinkOperatorCount;
+
+    public CorrelationNodeProcCtx(ParseContext pctx) {
+      super(pctx);
+      walked = new HashSet<ReduceSinkOperator>();
+      correlations = new ArrayList<IntraQueryCorrelation>();
+      abort = false;
+      abortReasons = new ArrayList<String>();
+      fileSinkOperatorCount = 0;
+    }
+
+    public void setAbort(boolean abort) {
+      this.abort = abort;
+    }
+
+    public boolean isAbort() {
+      return abort;
+    }
+
+    public List<String> getAbortReasons() {
+      return abortReasons;
+    }
+
+    public void addCorrelation(IntraQueryCorrelation correlation) {
+      correlations.add(correlation);
+    }
+
+    public List<IntraQueryCorrelation> getCorrelations() {
+      return correlations;
+    }
+
+    public boolean isWalked(ReduceSinkOperator op) {
+      return walked.contains(op);
+    }
+
+    public void addWalked(ReduceSinkOperator op) {
+      walked.add(op);
+    }
+
+    public void addWalkedAll(Collection<ReduceSinkOperator> c) {
+      walked.addAll(c);
+    }
+
+    public void removeWalked(ReduceSinkOperator op) {
+      walked.remove(op);
+    }
+
+    public void removeWalkedAll(Collection<ReduceSinkOperator> c) {
+      walked.removeAll(c);
+    }
+
+    public void incrementFileSinkOperatorCount() {
+      fileSinkOperatorCount++;
+      if (fileSinkOperatorCount == 2) {
+        abort = true;
+        abortReasons.add(
+            "-- Currently, a query with multiple FileSinkOperators are not supported.");
+      }
+    }
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+/**
+ * Utilities for both CorrelationOptimizer and ReduceSinkDeDuplication.
+ *
+ */
+public final class CorrelationUtilities {
+
+  protected static boolean isExisted(ExprNodeDesc expr, List<ExprNodeDesc> columns) {
+    for (ExprNodeDesc thisExpr : columns) {
+      if (thisExpr != null && thisExpr.isSame(expr)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  protected static String getColumnName(
+      Map<String, ExprNodeDesc> opColumnExprMap, ExprNodeDesc expr) {
+    for (Entry<String, ExprNodeDesc> entry : opColumnExprMap.entrySet()) {
+      ExprNodeDesc thisExpr = entry.getValue();
+      if (thisExpr != null && thisExpr.isSame(expr)) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  protected static boolean hasGroupingSet(ReduceSinkOperator cRS) throws SemanticException {
+    GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class);
+    if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param operator the input operator
+   * @param throwException if throw a exception when the input operator has multiple parents
+   * @return the single parent or null when the input operator has multiple parents and
+   *         throwException is false;
+   * @throws HiveException
+   */
+  protected static Operator<?> getSingleParent(Operator<?> operator,
+      boolean throwException) throws SemanticException {
+    List<Operator<?>> parents = operator.getParentOperators();
+    if (parents != null && parents.size() == 1) {
+      return parents.get(0);
+    }
+    if (throwException) {
+      if (parents == null) {
+        throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+            operator.getIdentifier() + ") does not have any parent, but we expect 1 parent.");
+      } else if (parents.size() > 1) {
+        throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+            operator.getIdentifier() + ") has " + parents.size() +
+            " parents, but we expect 1 parent.");
+      }
+    }
+    return null;
+  }
+
+  protected static Operator<?> getSingleParent(Operator<?> operator) throws SemanticException {
+    return getSingleParent(operator, false);
+  }
+
+  /**
+   * @param operator the input operator
+   * @param throwException if throw a exception when the input operator has multiple children
+   * @return the single child or null when the input operator has multiple children and
+   *         throwException is false;
+   * @throws HiveException
+   */
+  protected static Operator<?> getSingleChild(Operator<?> operator,
+      boolean throwException) throws SemanticException {
+    List<Operator<?>> children = operator.getChildOperators();
+    if (children != null && children.size() == 1) {
+      return children.get(0);
+    }
+    if (throwException) {
+      if (children == null) {
+        throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+            operator.getIdentifier() + ") does not have any parent, but we expect 1 parent.");
+      } else if (children.size() > 1) {
+        throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+            operator.getIdentifier() + ") has " + children.size() +
+            " parents, but we expect 1 parent.");
+      }
+    }
+    return null;
+  }
+
+  protected static Operator<?> getSingleChild(Operator<?> operator) throws SemanticException {
+    return getSingleChild(operator, false);
+  }
+
+  protected static <T> T getSingleChild(Operator<?> operator, Class<T> type)
+      throws SemanticException {
+    Operator<?> parent = getSingleChild(operator);
+    return type.isInstance(parent) ? (T)parent : null;
+  }
+
+  protected static <T> T getSingleParent(Operator<?> operator, Class<T> type)
+      throws SemanticException {
+    Operator<?> parent = getSingleParent(operator);
+    return type.isInstance(parent) ? (T)parent : null;
+  }
+
+  protected static Operator<?> getStartForGroupBy(ReduceSinkOperator cRS)
+      throws SemanticException {
+    Operator<? extends Serializable> parent = getSingleParent(cRS);
+    return parent instanceof GroupByOperator ? parent : cRS;  // skip map-aggr GBY
+  }
+
+
+  protected static boolean[] getSortedTags(JoinOperator joinOp) {
+    boolean[] result = new boolean[joinOp.getParentOperators().size()];
+    for (int tag = 0; tag < result.length; tag++) {
+      result[tag] = isSortedTag(joinOp, tag);
+    }
+    return result;
+  }
+
+  // for left outer joins, left alias is sorted but right alias might be not
+  // (nulls, etc.). vice versa.
+  protected static boolean isSortedTag(JoinOperator joinOp, int tag) {
+    for (JoinCondDesc cond : joinOp.getConf().getConds()) {
+      switch (cond.getType()) {
+        case JoinDesc.LEFT_OUTER_JOIN:
+          if (cond.getRight() == tag) {
+            return false;
+          }
+          continue;
+        case JoinDesc.RIGHT_OUTER_JOIN:
+          if (cond.getLeft() == tag) {
+            return false;
+          }
+          continue;
+        case JoinDesc.FULL_OUTER_JOIN:
+          if (cond.getLeft() == tag || cond.getRight() == tag) {
+            return false;
+          }
+      }
+    }
+    return true;
+  }
+
+  protected static int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child,
+      Operator[] parents, boolean[] sorted) throws SemanticException {
+    for (int tag = 0; tag < parents.length; tag++) {
+      if (sorted[tag] &&
+          pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) {
+        return tag;
+      }
+    }
+    return -1;
+  }
+
+  protected static <T extends Operator<?>> T findPossibleParent(Operator<?> start, Class<T> target,
+      boolean trustScript) throws SemanticException {
+    T[] parents = findPossibleParents(start, target, trustScript);
+    return parents != null && parents.length == 1 ? parents[0] : null;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected static <T extends Operator<?>> T[] findPossibleParents(
+      Operator<?> start, Class<T> target,
+      boolean trustScript) throws SemanticException {
+    Operator<?> cursor = getSingleParent(start);
+    for (; cursor != null; cursor = getSingleParent(cursor)) {
+      if (target.isAssignableFrom(cursor.getClass())) {
+        T[] array = (T[]) Array.newInstance(target, 1);
+        array[0] = (T) cursor;
+        return array;
+      }
+      if (cursor instanceof JoinOperator) {
+        return findParents((JoinOperator) cursor, target);
+      }
+      if (cursor instanceof ScriptOperator && !trustScript) {
+        return null;
+      }
+      if (!(cursor instanceof SelectOperator
+          || cursor instanceof FilterOperator
+          || cursor instanceof ExtractOperator
+          || cursor instanceof ForwardOperator
+          || cursor instanceof ScriptOperator
+          || cursor instanceof ReduceSinkOperator)) {
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected static <T extends Operator<?>> T[] findParents(JoinOperator join, Class<T> target)
+      throws SemanticException {
+    List<Operator<?>> parents = join.getParentOperators();
+    T[] result = (T[]) Array.newInstance(target, parents.size());
+    for (int tag = 0; tag < result.length; tag++) {
+      Operator<?> cursor = parents.get(tag);
+      for (; cursor != null; cursor = getSingleParent(cursor)) {
+        if (target.isAssignableFrom(cursor.getClass())) {
+          result[tag] = (T) cursor;
+          break;
+        }
+      }
+      if (result[tag] == null) {
+        throw new IllegalStateException("failed to find " + target.getSimpleName()
+            + " from " + join + " on tag " + tag);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Search the query plan tree from startPoint to the bottom. If there is no ReduceSinkOperator
+   * between startPoint and the corresponding TableScanOperator, return the corresponding
+   * TableScanOperator. Otherwise, return null.
+   * @param startPoint the operator which the search will start at
+   * @return the TableScanOperator traced from startPoint. Null, if the search encounters any
+   * ReduceSinkOperator.
+   */
+  protected static TableScanOperator findTableScanOperator(
+      Operator<? extends OperatorDesc> startPoint) {
+    Operator<? extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+    while (true) {
+      if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+        return null;
+      } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+        return (TableScanOperator) thisOp;
+      } else {
+        if (thisOp.getParentOperators() != null) {
+          thisOp = thisOp.getParentOperators().get(0);
+        } else {
+          break;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find all sibling ReduceSinkOperators (which have the same child operator of op) of op (op
+   * included).
+   * @throws SemanticException
+   */
+  public static List<ReduceSinkOperator> findSiblingReduceSinkOperators(ReduceSinkOperator op)
+      throws SemanticException {
+    List<ReduceSinkOperator> siblingRSs = new ArrayList<ReduceSinkOperator>();
+    Operator<? extends OperatorDesc> child = getSingleChild(op, true);
+    for (Operator<? extends OperatorDesc> parent: child.getParentOperators()) {
+      if (parent instanceof ReduceSinkOperator) {
+        siblingRSs.add((ReduceSinkOperator)parent);
+      } else {
+        throw new SemanticException("An sibling of a ReduceSinkOperatpr is not a" +
+            "ReduceSinkOperatpr.");
+      }
+    }
+    return siblingRSs;
+  }
+
+  /**
+   * Find all sibling operators (which have the same child operator of op) of op (op
+   * included).
+   * @throws SemanticException
+   */
+  public static List<Operator<? extends OperatorDesc>> findSiblingOperators(
+      Operator<? extends OperatorDesc> op)
+      throws SemanticException {
+    Operator<? extends OperatorDesc> child = getSingleChild(op, true);
+    return child.getParentOperators();
+  }
+
+  protected static SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS,
+      ParseContext context, AbstractCorrelationProcCtx procCtx) throws SemanticException {
+    SelectOperator select = replaceOperatorWithSelect(childRS, context, procCtx);
+    select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames());
+    select.getConf().setColList(childRS.getConf().getValueCols());
+    return select;
+  }
+
+  // replace the cRS to SEL operator
+  // If child if cRS is EXT, EXT also should be removed
+  protected static SelectOperator replaceOperatorWithSelect(Operator<?> operator,
+      ParseContext context, AbstractCorrelationProcCtx procCtx)
+      throws SemanticException {
+    RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+    SelectDesc select = new SelectDesc(null, null);
+
+    Operator<?> parent = getSingleParent(operator);
+    Operator<?> child = getSingleChild(operator);
+
+    parent.getChildOperators().clear();
+
+    SelectOperator sel = (SelectOperator) putOpInsertMap(
+        OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+            .getColumnInfos()), parent), inputRR, context);
+
+    sel.setColumnExprMap(operator.getColumnExprMap());
+
+    sel.setChildOperators(operator.getChildOperators());
+    for (Operator<? extends Serializable> ch : operator.getChildOperators()) {
+      ch.replaceParent(operator, sel);
+    }
+    if (child instanceof ExtractOperator) {
+      removeOperator(child, getSingleChild(child), sel, context);
+      procCtx.addRemovedOperator(child);
+    }
+    operator.setChildOperators(null);
+    operator.setParentOperators(null);
+    procCtx.addRemovedOperator(operator);
+    return sel;
+  }
+
+  protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr,
+      ParseContext context, AbstractCorrelationProcCtx procCtx) throws SemanticException {
+
+    Operator<?> parent = getSingleParent(cRS);
+
+    if (parent instanceof GroupByOperator) {
+      // pRS-cGBYm-cRS-cGBYr (map aggregation) --> pRS-cGBYr(COMPLETE)
+      // copies desc of cGBYm to cGBYr and remove cGBYm and cRS
+      GroupByOperator cGBYm = (GroupByOperator) parent;
+
+      cGBYr.getConf().setKeys(cGBYm.getConf().getKeys());
+      cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators());
+      for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) {
+        aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE);
+      }
+      cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
+      cGBYr.setSchema(cGBYm.getSchema());
+      RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
+      context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
+    } else {
+      // pRS-cRS-cGBYr (no map aggregation) --> pRS-cGBYr(COMPLETE)
+      // revert expressions of cGBYr to that of cRS
+      cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS));
+      for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) {
+        aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS));
+      }
+
+      Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
+      RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+
+      Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
+      RowResolver newRR = new RowResolver();
+
+      List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
+      for (int i = 0; i < outputCols.size(); i++) {
+        String colName = outputCols.get(i);
+        String[] nm = oldRR.reverseLookup(colName);
+        ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
+        newRR.put(nm[0], nm[1], colInfo);
+        ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
+        if (colExpr != null) {
+          newMap.put(colInfo.getInternalName(), colExpr);
+        }
+      }
+      cGBYr.setColumnExprMap(newMap);
+      cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
+      context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
+    }
+    cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
+
+    removeOperator(cRS, cGBYr, parent, context);
+    procCtx.addRemovedOperator(cRS);
+
+    if (parent instanceof GroupByOperator) {
+      removeOperator(parent, cGBYr, getSingleParent(parent), context);
+      procCtx.addRemovedOperator(cGBYr);
+    }
+  }
+
+  /** throw a exception if the input operator is null
+   * @param operator
+   * @throws HiveException
+   */
+  protected static void isNullOperator(Operator<?> operator) throws SemanticException {
+    if (operator == null) {
+      throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+          operator.getIdentifier() + ") is null.");
+    }
+  }
+
+  /**
+   * @param newOperator the operator will be inserted between child and parent
+   * @param child
+   * @param parent
+   * @param context
+   * @throws HiveException
+   */
+  protected static void insertOperatorBetween(
+      Operator<?> newOperator, Operator<?> parent, Operator<?> child)
+          throws SemanticException {
+    isNullOperator(newOperator);
+    isNullOperator(parent);
+    isNullOperator(child);
+
+    if (parent != getSingleParent(child)) {
+      throw new SemanticException("Operator " + parent.getName() + " (ID: " +
+          parent.getIdentifier() + ") is not the only parent of Operator " +
+          child.getName() + " (ID: " + child.getIdentifier() + ")");
+    }
+    if (child != getSingleChild(parent)) {
+      throw new SemanticException("Operator " + child.getName() + " (ID: " +
+          child.getIdentifier() + ") is not the only child of Operator " +
+          parent.getName() + " (ID: " + parent.getIdentifier() + ")");
+    }
+
+    newOperator.setParentOperators(Utilities.makeList(parent));
+    newOperator.setChildOperators(Utilities.makeList(child));
+
+    child.setParentOperators(Utilities.makeList(newOperator));
+    parent.setChildOperators(Utilities.makeList(newOperator));
+  }
+
+  protected static void removeOperator(Operator<?> target, Operator<?> child, Operator<?> parent,
+      ParseContext context) {
+    for (Operator<?> aparent : target.getParentOperators()) {
+      aparent.replaceChild(target, child);
+    }
+    for (Operator<?> achild : target.getChildOperators()) {
+      achild.replaceParent(target, parent);
+    }
+    target.setChildOperators(null);
+    target.setParentOperators(null);
+    context.getOpParseCtx().remove(target);
+  }
+
+  protected static Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+      ParseContext context) {
+    OpParseContext ctx = new OpParseContext(rr);
+    context.getOpParseCtx().put(op, ctx);
+    return op;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+/**
+ * IntraQueryCorrelation records a sub-tree of the query plan tree which can be
+ * evaluated in a single MR job. The boundary of this sub-tree is recorded by
+ * the ReduceSinkOperators the the bottom of this sub-tree.
+ * Also, allReduceSinkOperators in IntraQueryCorrelation contains all
+ * ReduceSinkOperators of this sub-tree.
+ */
+public class IntraQueryCorrelation {
+  private boolean jobFlowCorrelation;
+
+  // The bottom layer ReduceSinkOperators. These ReduceSinkOperators are used
+  // to record the boundary of this sub-tree which can be evaluated in a single MR
+  // job.
+  private List<ReduceSinkOperator> bottomReduceSinkOperators;
+
+  // The number of reducer(s) should be used for those bottom layer ReduceSinkOperators
+  private int numReducers;
+  // This is the min number of reducer(s) for the bottom layer ReduceSinkOperators to avoid query
+  // executed on too small number of reducers.
+  private final int minReducers;
+
+  // All ReduceSinkOperators in this sub-tree. This set is used when we start to remove unnecessary
+  // ReduceSinkOperators.
+  private final Set<ReduceSinkOperator> allReduceSinkOperators;
+
+  // Since we merge multiple operation paths, we assign new tags to bottom layer
+  // ReduceSinkOperatos. This mapping is used to map new tags to original tags associated
+  // to these bottom layer ReduceSinkOperators.
+  private final Map<Integer, Integer> newTagToOldTag;
+
+  // A map from new tags to indices of children of DemuxOperator (the first Operator at the
+  // Reduce side of optimized plan)
+  private final Map<Integer, Integer> newTagToChildIndex;
+
+  public IntraQueryCorrelation(int minReducers) {
+    this.jobFlowCorrelation = false;
+    this.numReducers = -1;
+    this.minReducers = minReducers;
+    this.allReduceSinkOperators = new HashSet<ReduceSinkOperator>();
+    this.newTagToOldTag = new HashMap<Integer, Integer>();
+    this.newTagToChildIndex = new HashMap<Integer, Integer>();
+  }
+
+  public Map<Integer, Integer> getNewTagToOldTag() {
+    return newTagToOldTag;
+  }
+
+  public Map<Integer, Integer> getNewTagToChildIndex() {
+    return newTagToChildIndex;
+  }
+
+  public void setNewTag(Integer newTag, Integer oldTag, Integer childIndex) {
+    newTagToOldTag.put(newTag, oldTag);
+    newTagToChildIndex.put(newTag, childIndex);
+  }
+  public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+    allReduceSinkOperators.add(rsop);
+  }
+
+  public Set<ReduceSinkOperator> getAllReduceSinkOperators() {
+    return allReduceSinkOperators;
+  }
+
+  public void setJobFlowCorrelation(boolean jobFlowCorrelation,
+      List<ReduceSinkOperator> bottomReduceSinkOperators) {
+    this.jobFlowCorrelation = jobFlowCorrelation;
+    this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+  }
+
+  public boolean hasJobFlowCorrelation() {
+    return jobFlowCorrelation;
+  }
+
+  public List<ReduceSinkOperator> getBottomReduceSinkOperators() {
+    return bottomReduceSinkOperators;
+  }
+
+  public int getNumReducers() {
+    return numReducers;
+  }
+
+  public boolean adjustNumReducers(int newNumReducers) {
+    assert newNumReducers != 0;
+    if (newNumReducers > 0) {
+      // If the new numReducer is less than minReducer, we will not consider
+      // ReduceSinkOperator with this newNumReducer as a correlated ReduceSinkOperator
+      if (newNumReducers < minReducers) {
+        return false;
+      }
+      if (numReducers > 0) {
+        if (newNumReducers != numReducers) {
+          // If (numReducers > 0 && newNumReducers > 0 && newNumReducers != numReducers),
+          // we will not consider ReduceSinkOperator with this newNumReducer as a correlated
+          // ReduceSinkOperator
+          return false;
+        }
+      } else {
+        // if numReducers < 0 and newNumReducers > 0
+        numReducers = newNumReducers;
+      }
+    }
+
+    return true;
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.optimizer.correlation.CorrelationOptimizer.CorrelationNodeProcCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DemuxDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+/**
+ * QueryPlanTreeTransformation contains static methods used to transform
+ * the query plan tree (operator tree) based on the correlation we have
+ * detected by Correlation Optimizer.
+ */
+public class QueryPlanTreeTransformation {
+  private static final Log LOG = LogFactory.getLog(QueryPlanTreeTransformation.class.getName());
+
+  private static void setNewTag(IntraQueryCorrelation correlation,
+      List<Operator<? extends OperatorDesc>> childrenOfDemux,
+      ReduceSinkOperator rsop, Map<ReduceSinkOperator, Integer> bottomRSToNewTag)
+          throws SemanticException {
+    int newTag = bottomRSToNewTag.get(rsop);
+    int oldTag = rsop.getConf().getTag();
+    // if this child of dispatcher does not use tag, we just set the oldTag to 0;
+    if (oldTag == -1) {
+      oldTag = 0;
+    }
+    Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
+    if (!childrenOfDemux.contains(child)) {
+      childrenOfDemux.add(child);
+    }
+    int childIndex = childrenOfDemux.indexOf(child);
+    correlation.setNewTag(newTag, oldTag, childIndex);
+    rsop.getConf().setTag(newTag);
+  }
+
+  /** Based on the correlation, we transform the query plan tree (operator tree).
+   * In here, we first create DemuxOperator and all bottom ReduceSinkOperators
+   * (bottom means near TableScanOperaotr) in the correlation will be be
+   * the parents of the DemuxOperaotr. We also reassign tags to those
+   * ReduceSinkOperators. Then, we use MuxOperators to replace ReduceSinkOperators
+   * which are not bottom ones in this correlation.
+   * Example: The original operator tree is ...
+   *      JOIN2
+   *      /    \
+   *     RS4   RS5
+   *    /        \
+   *   GBY1     JOIN1
+   *    |       /    \
+   *   RS1     RS2   RS3
+   * If GBY1, JOIN1, and JOIN2 can be executed in the same reducer
+   * (optimized by Correlation Optimizer).
+   * The new operator tree will be ...
+   *      JOIN2
+   *        |
+   *       MUX
+   *      /   \
+   *    GBY1  JOIN1
+   *      \    /
+   *       DEMUX
+   *      /  |  \
+   *     /   |   \
+   *    /    |    \
+   *   RS1   RS2   RS3
+   * @param pCtx
+   * @param corrCtx
+   * @param correlation
+   * @throws SemanticException
+   */
+  protected static void applyCorrelation(
+      ParseContext pCtx,
+      CorrelationNodeProcCtx corrCtx,
+      IntraQueryCorrelation correlation)
+      throws SemanticException {
+
+    final List<ReduceSinkOperator> bottomReduceSinkOperators =
+        correlation.getBottomReduceSinkOperators();
+    final int numReducers = correlation.getNumReducers();
+    List<Operator<? extends OperatorDesc>> childrenOfDemux =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    List<Operator<? extends OperatorDesc>> parentRSsOfDemux =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    Map<Integer, Integer> childIndexToOriginalNumParents =
+        new HashMap<Integer, Integer>();
+    List<TableDesc> keysSerializeInfos = new ArrayList<TableDesc>();
+    List<TableDesc> valuessSerializeInfos = new ArrayList<TableDesc>();
+    Map<ReduceSinkOperator, Integer> bottomRSToNewTag =
+        new HashMap<ReduceSinkOperator, Integer>();
+    int newTag = 0;
+    for (ReduceSinkOperator rsop: bottomReduceSinkOperators) {
+      rsop.getConf().setNumReducers(numReducers);
+      bottomRSToNewTag.put(rsop, newTag);
+      parentRSsOfDemux.add(rsop);
+      keysSerializeInfos.add(rsop.getConf().getKeySerializeInfo());
+      valuessSerializeInfos.add(rsop.getConf().getValueSerializeInfo());
+      Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
+      if (!childrenOfDemux.contains(child)) {
+        childrenOfDemux.add(child);
+        int childIndex = childrenOfDemux.size() - 1;
+        childIndexToOriginalNumParents.put(childIndex, child.getNumParent());
+      }
+      newTag++;
+    }
+
+    for (ReduceSinkOperator rsop: bottomReduceSinkOperators) {
+      setNewTag(correlation, childrenOfDemux, rsop, bottomRSToNewTag);
+    }
+
+    // Create the DemuxOperaotr
+    DemuxDesc demuxDesc =
+        new DemuxDesc(
+            correlation.getNewTagToOldTag(),
+            correlation.getNewTagToChildIndex(),
+            childIndexToOriginalNumParents,
+            keysSerializeInfos,
+            valuessSerializeInfos);
+    Operator<? extends OperatorDesc> demuxOp = OperatorFactory.get(demuxDesc);
+    demuxOp.setChildOperators(childrenOfDemux);
+    demuxOp.setParentOperators(parentRSsOfDemux);
+    for (Operator<? extends OperatorDesc> child: childrenOfDemux) {
+      List<Operator<? extends OperatorDesc>> parentsWithMultipleDemux =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      boolean hasBottomReduceSinkOperators = false;
+      boolean hasNonBottomReduceSinkOperators = false;
+      for (int i = 0; i < child.getParentOperators().size(); i++) {
+        Operator<? extends OperatorDesc> p = child.getParentOperators().get(i);
+        assert p instanceof ReduceSinkOperator;
+        ReduceSinkOperator rsop = (ReduceSinkOperator)p;
+        if (bottomReduceSinkOperators.contains(rsop)) {
+          hasBottomReduceSinkOperators = true;
+          parentsWithMultipleDemux.add(demuxOp);
+        } else {
+          hasNonBottomReduceSinkOperators = true;
+          parentsWithMultipleDemux.add(rsop);
+        }
+      }
+      if (hasBottomReduceSinkOperators && hasNonBottomReduceSinkOperators) {
+        child.setParentOperators(parentsWithMultipleDemux);
+      } else {
+        child.setParentOperators(Utilities.makeList(demuxOp));
+      }
+    }
+    for (Operator<? extends OperatorDesc> parent: parentRSsOfDemux) {
+      parent.setChildOperators(Utilities.makeList(demuxOp));
+    }
+
+    // replace all ReduceSinkOperators which are not at the bottom of
+    // this correlation to MuxOperators
+    Set<ReduceSinkOperator> handledRSs = new HashSet<ReduceSinkOperator>();
+    for (ReduceSinkOperator rsop : correlation.getAllReduceSinkOperators()) {
+      if (!bottomReduceSinkOperators.contains(rsop)) {
+        if (handledRSs.contains(rsop)) {
+          continue;
+        }
+        Operator<? extends OperatorDesc> childOP =
+            CorrelationUtilities.getSingleChild(rsop, true);
+        if (childOP instanceof GroupByOperator) {
+          CorrelationUtilities.removeReduceSinkForGroupBy(
+              rsop, (GroupByOperator)childOP, pCtx, corrCtx);
+          List<Operator<? extends OperatorDesc>> parentsOfMux =
+              new ArrayList<Operator<? extends OperatorDesc>>();
+          Operator<? extends OperatorDesc> parentOp =
+              CorrelationUtilities.getSingleParent(childOP, true);
+          parentsOfMux.add(parentOp);
+          Operator<? extends OperatorDesc> mux = OperatorFactory.get(
+              new MuxDesc(parentsOfMux));
+          mux.setChildOperators(Utilities.makeList(childOP));
+          mux.setParentOperators(parentsOfMux);
+          childOP.setParentOperators(Utilities.makeList(mux));
+          parentOp.setChildOperators(Utilities.makeList(mux));
+        } else {
+          // childOp is a JoinOperator
+          List<Operator<? extends OperatorDesc>> parentsOfMux =
+              new ArrayList<Operator<? extends OperatorDesc>>();
+          List<Operator<? extends OperatorDesc>> siblingOPs =
+              CorrelationUtilities.findSiblingOperators(rsop);
+          for (Operator<? extends OperatorDesc> op: siblingOPs) {
+            if (op instanceof DemuxOperator) {
+              parentsOfMux.add(op);
+            } else if (op instanceof ReduceSinkOperator){
+              GroupByOperator pGBYm =
+                  CorrelationUtilities.getSingleParent(op, GroupByOperator.class);
+              if (pGBYm != null) {
+                // We get a semi join at here.
+                // This map-side GroupByOperator needs to be removed
+                CorrelationUtilities.removeOperator(
+                    pGBYm, op, CorrelationUtilities.getSingleParent(pGBYm, true), pCtx);
+              }
+              handledRSs.add((ReduceSinkOperator)op);
+              parentsOfMux.add(CorrelationUtilities.getSingleParent(op, true));
+            } else {
+              throw new SemanticException("An slibing of ReduceSinkOperator is nethier a " +
+                  "DemuxOperator nor a ReduceSinkOperator");
+            }
+          }
+          MuxDesc muxDesc = new MuxDesc(siblingOPs);
+          Operator<? extends OperatorDesc> mux = OperatorFactory.get(muxDesc);
+          mux.setChildOperators(Utilities.makeList(childOP));
+          mux.setParentOperators(parentsOfMux);
+
+          for (Operator<? extends OperatorDesc> op: parentsOfMux) {
+            if (op instanceof DemuxOperator) {
+              // op is a DemuxOperator and it directly connects to childOP.
+              // We will add this MuxOperator between DemuxOperator
+              // and childOP.
+              if (op.getChildOperators().contains(childOP)) {
+                op.replaceChild(childOP, mux);
+              }
+            } else {
+              // op is not a DemuxOperator, so it should have
+              // a single child.
+              op.setChildOperators(Utilities.makeList(mux));
+            }
+          }
+          childOP.setParentOperators(Utilities.makeList(mux));
+        }
+      }
+    }
+    for (ReduceSinkOperator rsop: handledRSs) {
+      rsop.setChildOperators(null);
+      rsop.setParentOperators(null);
+      pCtx.getOpParseCtx().remove(rsop);
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+/**
+ * If two reducer sink operators share the same partition/sort columns and order,
+ * they can be merged. This should happen after map join optimization because map
+ * join optimization will remove reduce sink operators.
+ *
+ * This optimizer removes/replaces child-RS (not parent) which is safer way for DefaultGraphWalker.
+ */
+public class ReduceSinkDeDuplication implements Transform {
+
+  private static final String RS = ReduceSinkOperator.getOperatorName();
+  private static final String GBY = GroupByOperator.getOperatorName();
+  private static final String JOIN = JoinOperator.getOperatorName();
+
+  protected ParseContext pGraphContext;
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+
+    // generate pruned column list for all relevant operators
+    ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
+
+    // for auto convert map-joins, it not safe to dedup in here (todo)
+    boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) &&
+        !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK);
+
+    // If multiple rules can be matched with same cost, last rule will be choosen as a processor
+    // see DefaultRuleDispatcher#dispatch()
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+    opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc());
+    if (mergeJoins) {
+      opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"),
+          ReduceSinkDeduplicateProcFactory.getJoinReducerProc());
+    }
+    // TODO RS+JOIN
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(ReduceSinkDeduplicateProcFactory
+        .getDefaultProc(), opRules, cppCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pGraphContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pGraphContext;
+  }
+
+  protected class ReduceSinkDeduplicateProcCtx extends AbstractCorrelationProcCtx {
+
+    public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
+      super(pctx);
+    }
+  }
+
+  static class ReduceSinkDeduplicateProcFactory {
+
+    public static NodeProcessor getReducerReducerProc() {
+      return new ReducerReducerProc();
+    }
+
+    public static NodeProcessor getGroupbyReducerProc() {
+      return new GroupbyReducerProc();
+    }
+
+    public static NodeProcessor getJoinReducerProc() {
+      return new JoinReducerProc();
+    }
+
+    public static NodeProcessor getDefaultProc() {
+      return new DefaultProc();
+    }
+  }
+
+  /*
+   * do nothing.
+   */
+  static class DefaultProc implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      return null;
+    }
+  }
+
+  public abstract static class AbsctractReducerReducerProc implements NodeProcessor {
+
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      ReduceSinkDeduplicateProcCtx dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx;
+      if (dedupCtx.hasBeenRemoved((Operator<?>) nd)) {
+        return false;
+      }
+      ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+      Operator<?> child = CorrelationUtilities.getSingleChild(cRS);
+      if (child instanceof JoinOperator) {
+        return false; // not supported
+      }
+      if (child instanceof GroupByOperator) {
+        GroupByOperator cGBY = (GroupByOperator) child;
+        if (!CorrelationUtilities.hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) {
+          return process(cRS, cGBY, dedupCtx);
+        }
+        return false;
+      }
+      if (child instanceof ExtractOperator) {
+        return process(cRS, dedupCtx);
+      }
+      return false;
+    }
+
+    protected abstract Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException;
+
+    protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException;
+
+    // for JOIN-RS case, it's not possible generally to merge if child has
+    // more key/partition columns than parents
+    protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+        throws SemanticException {
+      List<Operator<?>> parents = pJoin.getParentOperators();
+      ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+      ReduceSinkDesc cRSc = cRS.getConf();
+      ReduceSinkDesc pRS0c = pRSs[0].getConf();
+      if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) {
+        return false;
+      }
+      if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) {
+        return false;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+        return false;
+      }
+
+      Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder());
+      if (moveRSOrderTo == null) {
+        return false;
+      }
+
+      boolean[] sorted = CorrelationUtilities.getSortedTags(pJoin);
+
+      int cKeySize = cRSc.getKeyCols().size();
+      for (int i = 0; i < cKeySize; i++) {
+        ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
+        }
+        int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
+      int cPartSize = cRSc.getPartitionCols().size();
+      for (int i = 0; i < cPartSize; i++) {
+        ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+        }
+        int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
+
+      if (moveReducerNumTo > 0) {
+        for (ReduceSinkOperator pRS : pRSs) {
+          pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Current RSDedup remove/replace child RS. So always copies
+     * more specific part of configurations of child RS to that of parent RS.
+     */
+    protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      int[] result = checkStatus(cRS, pRS, minReducer);
+      if (result == null) {
+        return false;
+      }
+      if (result[0] > 0) {
+        ArrayList<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+        pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+      }
+      if (result[1] > 0) {
+        ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+        pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+      }
+      if (result[2] > 0) {
+        pRS.getConf().setOrder(cRS.getConf().getOrder());
+      }
+      if (result[3] > 0) {
+        pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+      }
+      return true;
+    }
+
+    /**
+     * Returns merge directions between two RSs for criterias (ordering, number of reducers,
+     * reducer keys, partition keys). Returns null if any of categories is not mergeable.
+     *
+     * Values for each index can be -1, 0, 1
+     * 1. 0 means two configuration in the category is the same
+     * 2. for -1, configuration of parent RS is more specific than child RS
+     * 3. for 1, configuration of child RS is more specific than parent RS
+     */
+    private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      ReduceSinkDesc cConf = cRS.getConf();
+      ReduceSinkDesc pConf = pRS.getConf();
+      Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder());
+      if (moveRSOrderTo == null) {
+        return null;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+        return null;
+      }
+      List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+      List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+      Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+      if (moveKeyColTo == null) {
+        return null;
+      }
+      List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+      List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+      Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+      if (movePartitionColTo == null) {
+        return null;
+      }
+      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+    }
+
+    /**
+     * Overlapping part of keys should be the same between parent and child.
+     * And if child has more keys than parent, non-overlapping part of keys
+     * should be backtrackable to parent.
+     */
+    private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+        ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+      Integer moveKeyColTo = 0;
+      if (ckeys == null || ckeys.isEmpty()) {
+        if (pkeys != null && !pkeys.isEmpty()) {
+          moveKeyColTo = -1;
+        }
+      } else {
+        if (pkeys == null || pkeys.isEmpty()) {
+          for (ExprNodeDesc ckey : ckeys) {
+            if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+              // cKey is not present in parent
+              return null;
+            }
+          }
+          moveKeyColTo = 1;
+        } else {
+          moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+        }
+      }
+      return moveKeyColTo;
+    }
+
+    // backtrack key exprs of child to parent and compare it with parent's
+    protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+        Operator<?> child, Operator<?> parent) throws SemanticException {
+      int common = Math.min(cexprs.size(), pexprs.size());
+      int limit = Math.max(cexprs.size(), pexprs.size());
+      int i = 0;
+      for (; i < common; i++) {
+        ExprNodeDesc pexpr = pexprs.get(i);
+        ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+        if (cexpr == null || !pexpr.isSame(cexpr)) {
+          return null;
+        }
+      }
+      for (; i < limit; i++) {
+        if (cexprs.size() > pexprs.size()) {
+          if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+            // cKey is not present in parent
+            return null;
+          }
+        }
+      }
+      return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+    }
+
+    // order of overlapping keys should be exactly the same
+    protected Integer checkOrder(String corder, String porder) {
+      if (corder == null || corder.trim().equals("")) {
+        if (porder == null || porder.trim().equals("")) {
+          return 0;
+        }
+        return -1;
+      }
+      if (porder == null || porder.trim().equals("")) {
+        return 1;
+      }
+      corder = corder.trim();
+      porder = porder.trim();
+      int target = Math.min(corder.length(), porder.length());
+      if (!corder.substring(0, target).equals(porder.substring(0, target))) {
+        return null;
+      }
+      return Integer.valueOf(corder.length()).compareTo(porder.length());
+    }
+
+    /**
+     * If number of reducers for RS is -1, the RS can have any number of reducers.
+     * It's generally true except for order-by or forced bucketing cases.
+     * if both of num-reducers are not -1, those number should be the same.
+     */
+    protected Integer checkNumReducer(int creduce, int preduce) {
+      if (creduce < 0) {
+        if (preduce < 0) {
+          return 0;
+        }
+        return -1;
+      }
+      if (preduce < 0) {
+        return 1;
+      }
+      if (creduce != preduce) {
+        return null;
+      }
+      return 0;
+    }
+  }
+
+  static class GroupbyReducerProc extends AbsctractReducerReducerProc {
+
+    // pRS-pGBY-cRS
+    @Override
+    public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      GroupByOperator pGBY =
+          CorrelationUtilities.findPossibleParent(
+              cRS, GroupByOperator.class, dedupCtx.trustScript());
+      if (pGBY == null) {
+        return false;
+      }
+      ReduceSinkOperator pRS =
+          CorrelationUtilities.findPossibleParent(
+              pGBY, ReduceSinkOperator.class, dedupCtx.trustScript());
+      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+        CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+            cRS, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+
+    // pRS-pGBY-cRS-cGBY
+    @Override
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+      GroupByOperator pGBY =
+          CorrelationUtilities.findPossibleParent(
+              start, GroupByOperator.class, dedupCtx.trustScript());
+      if (pGBY == null) {
+        return false;
+      }
+      ReduceSinkOperator pRS =
+          CorrelationUtilities.getSingleParent(pGBY, ReduceSinkOperator.class);
+      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+        CorrelationUtilities.removeReduceSinkForGroupBy(
+            cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  static class JoinReducerProc extends AbsctractReducerReducerProc {
+
+    // pRS-pJOIN-cRS
+    @Override
+    public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      JoinOperator pJoin =
+          CorrelationUtilities.findPossibleParent(cRS, JoinOperator.class, dedupCtx.trustScript());
+      if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+            cRS, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+
+    // pRS-pJOIN-cRS-cGBY
+    @Override
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+      JoinOperator pJoin =
+          CorrelationUtilities.findPossibleParent(
+              start, JoinOperator.class, dedupCtx.trustScript());
+      if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        CorrelationUtilities.removeReduceSinkForGroupBy(
+            cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  static class ReducerReducerProc extends AbsctractReducerReducerProc {
+
+    // pRS-cRS
+    @Override
+    public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      ReduceSinkOperator pRS =
+          CorrelationUtilities.findPossibleParent(
+              cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
+      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+        CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+            cRS, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+
+    // pRS-cRS-cGBY
+    @Override
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ReduceSinkDeduplicateProcCtx dedupCtx)
+        throws SemanticException {
+      Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+      ReduceSinkOperator pRS =
+          CorrelationUtilities.findPossibleParent(
+              start, ReduceSinkOperator.class, dedupCtx.trustScript());
+      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+        CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+        return true;
+      }
+      return false;
+    }
+  }
+}