You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/09/30 22:41:03 UTC

svn commit: r1392105 [2/7] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimiz...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,1037 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBExpr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * Implementation of correlation optimizer. The optimization is based on
+ * the paper "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
+ * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf).
+ * This optimizer first detects three kinds of
+ * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC),
+ * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job.
+ * Since opColumnExprMap, opParseCtx, opRowResolver may be changed by
+ * other optimizers,
+ * currently, correlation optimizer has two phases. The first phase is the first transformation in
+ * the Optimizer. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver
+ * will be recorded. Then, the second phase (the last transformation before SimpleFetchOptimizer)
+ * will perform correlation detection and query plan tree transformation.
+ *
+ * For the definitions of correlations, see the original paper of YSmart.
+ *
+ * Rules for merging correlated MR-jobs implemented in this correlation
+ * optimizer are:
+ * 1. If an MR-job for a Join operation has the same partitioning keys with its all
+ * preceding MR-jobs, correlation optimizer merges these MR-jobs into one MR-job.
+ * 2. If an MR-job for a GroupBy and Aggregation operation has the same partitioning keys
+ * with its preceding MR-job, correlation optimizer merges these two MR-jobs into one MR-job.
+ *
+ * Note: In the current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree
+ * are correlated, it transforms this sub-plan tree to a single MR-job when the input of this
+ * sub-plan tree is not a temporary table. Otherwise, the current implementation will ignore this
+ * sub-plan tree.
+ *
+ * There are several future work that will enhance the correlation optimizer.
+ * Here are four examples:
+ * 1. Add a new rule that is if two MR-jobs share the same
+ * partitioning keys and they have common input tables, merge these two MR-jobs into a single
+ * MR-job.
+ * 2. The current implementation detects MR-jobs which have the same partitioning keys
+ * as correlated MR-jobs. However, the condition of same partitioning keys can be relaxed to use
+ * common partitioning keys.
+ * 3. The current implementation cannot optimize MR-jobs for the
+ * aggregation functions with a distinct keyword, which should be supported in the future
+ * implementation of the correlation optimizer.
+ * 4. Optimize queries involve self-join.
+ */
+
+public class CorrelationOptimizer implements Transform {
+
+  static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+  private final Map<String, String> aliastoTabName;
+  private final Map<String, Table> aliastoTab;
+
+  public CorrelationOptimizer() {
+    super();
+    aliastoTabName = new HashMap<String, String>();
+    aliastoTab = new HashMap<String, Table>();
+    pGraphContext = null;
+  }
+
+  private boolean initializeAliastoTabNameMapping(QB qb) {
+    // If any sub-query's qb is null, CorrelationOptimizer will not optimize this query.
+    // e.g. auto_join27.q
+    if (qb == null) {
+      return false;
+    }
+    boolean ret = true;
+    for (String alias : qb.getAliases()) {
+      aliastoTabName.put(alias, qb.getTabNameForAlias(alias));
+      aliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias));
+    }
+    for (String subqalias : qb.getSubqAliases()) {
+      QBExpr qbexpr = qb.getSubqForAlias(subqalias);
+      ret = ret && initializeAliastoTabNameMapping(qbexpr.getQB());
+    }
+    return ret;
+  }
+
+  protected ParseContext pGraphContext;
+  private Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
+  private final Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx =
+      new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
+  private final Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver =
+      new LinkedHashMap<Operator<? extends OperatorDesc>, RowResolver>();
+  private final Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap =
+      new LinkedHashMap<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>>();
+
+  private boolean isPhase1 = true;
+  private boolean abort = false;
+
+  private Map<ReduceSinkOperator, GroupByOperator> groupbyNonMapSide2MapSide;
+  private Map<GroupByOperator, ReduceSinkOperator> groupbyMapSide2NonMapSide;
+
+  /**
+   * Transform the query tree. Firstly, find out correlations between operations.
+   * Then, group these operators in groups
+   *
+   * @param pactx
+   *          current parse context
+   * @throws SemanticException
+   */
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    if (isPhase1) {
+      pGraphContext = pctx;
+      opParseCtx = pctx.getOpParseCtx();
+
+      CorrelationNodePhase1ProcCtx phase1ProcCtx = new CorrelationNodePhase1ProcCtx();
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules,
+          phase1ProcCtx);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      // Create a list of topOp nodes
+      List<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(pGraphContext.getTopOps().values());
+      ogw.startWalking(topNodes, null);
+      isPhase1 = false;
+      abort = phase1ProcCtx.fileSinkOperatorCount > 1;
+    } else {
+      /*
+       * Types of correlations:
+       * 1) Input Correlation: Multiple nodes have input correlation
+       * (IC) if their input relation sets are not disjoint;
+       * 2) Transit Correlation: Multiple nodes have transit correlation
+       * (TC) if they have not only input correlation, but
+       * also the same partition key;
+       * 3) Job Flow Correlation: A node has job flow correlation
+       * (JFC) with one of its child nodes if it has the same
+       * partition key as that child node.
+       */
+
+      pGraphContext = pctx;
+      if (abort) {
+        //TODO: handle queries with multiple FileSinkOperators;
+        LOG.info("Abort. Reasons are ...");
+        LOG.info("-- Currently, a query with multiple FileSinkOperators are not supported.");
+        return pGraphContext;
+      }
+
+
+      opParseCtx = pctx.getOpParseCtx();
+
+      groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide();
+      groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide();
+
+      QB qb = pGraphContext.getQB();
+      abort = !initializeAliastoTabNameMapping(qb);
+      if (abort) {
+        LOG.info("Abort. Reasons are ...");
+        LOG.info("-- This query or its sub-queries has a null qb.");
+        return pGraphContext;
+      }
+
+      // 0: Replace all map-side group by pattern (GBY-RS-GBY) to
+      // non-map-side group by pattern (RS-GBY) if necessary
+      if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+        for (Entry<GroupByOperator, ReduceSinkOperator> entry:
+          groupbyMapSide2NonMapSide.entrySet()) {
+          GroupByOperator mapSidePatternStart = entry.getKey();
+          GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+              .getChildOperators().get(0).getChildOperators().get(0);
+          ReduceSinkOperator nonMapSidePatternStart = entry.getValue();
+          GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+              .getChildOperators().get(0);
+
+          List<Operator<? extends OperatorDesc>> parents = mapSidePatternStart.getParentOperators();
+          List<Operator<? extends OperatorDesc>> children = mapSidePatternEnd.getChildOperators();
+
+          nonMapSidePatternStart.setParentOperators(parents);
+          nonMapSidePatternEnd.setChildOperators(children);
+
+          for (Operator<? extends OperatorDesc> parent: parents) {
+            parent.replaceChild(mapSidePatternStart, nonMapSidePatternStart);
+          }
+          for (Operator<? extends OperatorDesc> child: children) {
+            child.replaceParent(mapSidePatternEnd, nonMapSidePatternEnd);
+          }
+          addOperatorInfo(nonMapSidePatternStart);
+          addOperatorInfo(nonMapSidePatternEnd);
+        }
+      }
+
+      // 1: detect correlations
+      CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx();
+
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+          new CorrelationNodeProc());
+
+      Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      // Create a list of topOp nodes
+      List<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(pGraphContext.getTopOps().values());
+      ogw.startWalking(topNodes, null);
+      abort = correlationCtx.isAbort();
+      int correlationsAppliedCount = 0;
+      if (abort) {
+        LOG.info("Abort. Reasons are ...");
+        for (String reason: correlationCtx.getAbortReasons()) {
+          LOG.info("-- " + reason);
+        }
+      } else {
+        // 2: transform the query plan tree
+        LOG.info("Begain query plan transformation based on intra-query correlations. " +
+            correlationCtx.getCorrelations().size() + " correlation(s) to be applied");
+        for (IntraQueryCorrelation correlation : correlationCtx.getCorrelations()) {
+          boolean ret = CorrelationOptimizerUtils.applyCorrelation(
+              correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver,
+              groupbyNonMapSide2MapSide, originalOpParseCtx);
+          if (ret) {
+            correlationsAppliedCount++;
+          }
+        }
+      }
+
+      // 3: if no correlation applied, replace all non-map-side group by pattern (GBY-RS-GBY) to
+      // map-side group by pattern (RS-GBY) if necessary
+      if (correlationsAppliedCount == 0 &&
+          pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+        for (Entry<ReduceSinkOperator, GroupByOperator> entry:
+          groupbyNonMapSide2MapSide.entrySet()) {
+          GroupByOperator mapSidePatternStart = entry.getValue();
+          GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+              .getChildOperators().get(0).getChildOperators().get(0);
+          ReduceSinkOperator nonMapSidePatternStart = entry.getKey();
+          GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+              .getChildOperators().get(0);
+
+          List<Operator<? extends OperatorDesc>> parents = nonMapSidePatternStart.getParentOperators();
+          List<Operator<? extends OperatorDesc>> children = nonMapSidePatternEnd.getChildOperators();
+
+          mapSidePatternStart.setParentOperators(parents);
+          mapSidePatternEnd.setChildOperators(children);
+
+          for (Operator<? extends OperatorDesc> parent: parents) {
+            parent.replaceChild(nonMapSidePatternStart, mapSidePatternStart);
+          }
+          for (Operator<? extends OperatorDesc> child: children) {
+            child.replaceParent(nonMapSidePatternEnd, mapSidePatternEnd);
+          }
+        }
+      }
+      LOG.info("Finish query plan transformation based on intra-query correlations. " +
+          correlationsAppliedCount + " correlation(s) actually be applied");
+    }
+    return pGraphContext;
+  }
+
+  private void addOperatorInfo(Operator<? extends OperatorDesc> op) {
+    OpParseContext opCtx = opParseCtx.get(op);
+    if (op.getColumnExprMap() != null) {
+      if (!originalOpColumnExprMap.containsKey(op)) {
+        originalOpColumnExprMap.put(op, op.getColumnExprMap());
+      }
+    }
+    if (opCtx != null) {
+      if (!originalOpParseCtx.containsKey(op)) {
+        originalOpParseCtx.put(op, opCtx);
+      }
+      if (opCtx.getRowResolver() != null) {
+        if (!originalOpRowResolver.containsKey(op)) {
+          originalOpRowResolver.put(op, opCtx.getRowResolver());
+        }
+      }
+    }
+  }
+
+  private NodeProcessor getPhase1DefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+        addOperatorInfo(op);
+
+        if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+          ((CorrelationNodePhase1ProcCtx)procCtx).fileSinkOperatorCount++;
+        }
+        return null;
+      }
+    };
+  }
+
+  private class CorrelationNodeProc implements NodeProcessor {
+
+    public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) {
+      Operator<? extends OperatorDesc> op = rsop.getChildOperators().get(0);
+      while (!op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+        if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+          return null;
+        }
+        assert op.getChildOperators().size() <= 1;
+        op = op.getChildOperators().get(0);
+      }
+      return (ReduceSinkOperator) op;
+    }
+
+    private void analyzeReduceSinkOperatorsOfJoinOperator(JoinCondDesc[] joinConds,
+        List<Operator<? extends OperatorDesc>> rsOps, Operator<? extends OperatorDesc> curentRsOps,
+        Set<ReduceSinkOperator> correlatedRsOps) {
+      if (correlatedRsOps.contains((ReduceSinkOperator)curentRsOps)) {
+        return;
+      }
+
+      correlatedRsOps.add((ReduceSinkOperator)curentRsOps);
+
+      int pos = rsOps.indexOf(curentRsOps);
+      for (int i=0; i<joinConds.length; i++) {
+        JoinCondDesc joinCond = joinConds[i];
+        int type = joinCond.getType();
+        if (pos == joinCond.getLeft()) {
+          if (type == JoinDesc.INNER_JOIN || type == JoinDesc.LEFT_OUTER_JOIN) {
+            Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getRight());
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+                correlatedRsOps);
+          }
+        } else if (pos == joinCond.getRight()) {
+          if (type == JoinDesc.INNER_JOIN || type == JoinDesc.RIGHT_OUTER_JOIN) {
+            Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getLeft());
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+                correlatedRsOps);
+          }
+        }
+      }
+    }
+
+    private Set<ReduceSinkOperator> findCorrelatedReduceSinkOperators(
+        Operator<? extends OperatorDesc> op, Set<String> keyColumns,
+        IntraQueryCorrelation correlation) throws SemanticException {
+
+      LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName());
+
+      Set<ReduceSinkOperator> correlatedReduceSinkOps = new HashSet<ReduceSinkOperator>();
+      if (op.getParentOperators() == null) {
+        return correlatedReduceSinkOps;
+      }
+      if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) {
+        for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+          correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+              parent, keyColumns, correlation));
+        }
+      } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) {
+        Set<String> newKeyColumns = new HashSet<String>();
+        for (String keyColumn : keyColumns) {
+          ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+          if (col instanceof ExprNodeColumnDesc) {
+            newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+          }
+        }
+
+        if (op.getName().equals(CommonJoinOperator.getOperatorName())) {
+          Set<String> tableNeedToCheck = new HashSet<String>();
+          for (String keyColumn : keyColumns) {
+            for (ColumnInfo cinfo : originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) {
+              if (keyColumn.equals(cinfo.getInternalName())) {
+                tableNeedToCheck.add(cinfo.getTabAlias());
+              }
+            }
+          }
+          Set<ReduceSinkOperator> correlatedRsOps = new HashSet<ReduceSinkOperator>();
+          for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+            Set<String> tableNames =
+                originalOpParseCtx.get(parent).getRowResolver().getTableNames();
+            for (String tbl : tableNames) {
+              if (tableNeedToCheck.contains(tbl)) {
+                correlatedRsOps.addAll(findCorrelatedReduceSinkOperators(parent,
+                    newKeyColumns, correlation));
+              }
+            }
+          }
+
+          // Right now, if any ReduceSinkOperator of this JoinOperator is not correlated, we will
+          // not optimize this query
+          if (correlatedRsOps.size() == op.getParentOperators().size()) {
+            correlatedReduceSinkOps.addAll(correlatedRsOps);
+          } else {
+            correlatedReduceSinkOps.clear();
+          }
+        } else {
+          for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+            correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+                parent, newKeyColumns, correlation));
+          }
+        }
+      } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) {
+        Set<String> newKeyColumns = new HashSet<String>();
+        for (String keyColumn : keyColumns) {
+          ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+          if (col instanceof ExprNodeColumnDesc) {
+            newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+          }
+        }
+
+        ReduceSinkOperator rsop = (ReduceSinkOperator) op;
+        Set<String> thisKeyColumns = new HashSet<String>();
+        for (ExprNodeDesc key : rsop.getConf().getKeyCols()) {
+          if (key instanceof ExprNodeColumnDesc) {
+            thisKeyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+          }
+        }
+
+        boolean isCorrelated = false;
+        Set<String> intersection = new HashSet<String>(newKeyColumns);
+        intersection.retainAll(thisKeyColumns);
+        // TODO: should use if intersection is empty to evaluate if two corresponding operators are
+        // correlated
+        isCorrelated = (intersection.size() == thisKeyColumns.size() && !intersection.isEmpty());
+
+        ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop);
+        // Since we start the search from those reduceSinkOperator at bottom (near FileSinkOperator),
+        // we can always find a reduceSinkOperator at a lower level
+        assert nextChildReduceSinkOperator != null;
+        if (isCorrelated) {
+          if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+              CommonJoinOperator.getOperatorName())) {
+            if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() ||
+                intersection.size() != rsop.getConf().getKeyCols().size()) {
+              // Right now, we can only handle identical join keys.
+              isCorrelated = false;
+            }
+          }
+        }
+
+        if (isCorrelated) {
+          LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated");
+          LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+          LOG.info("--keys of child operator: " + keyColumns.toString());
+          LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+          if (((Operator<? extends OperatorDesc>) (op.getChildOperators().get(0))).getName()
+              .equals(CommonJoinOperator.getOperatorName())) {
+            JoinOperator joinOp = (JoinOperator)op.getChildOperators().get(0);
+            JoinCondDesc[] joinConds = joinOp.getConf().getConds();
+            List<Operator<? extends OperatorDesc>> rsOps = joinOp.getParentOperators();
+            Set<ReduceSinkOperator> correlatedRsOps = new HashSet<ReduceSinkOperator>();
+            analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, op, correlatedRsOps);
+            correlatedReduceSinkOps.addAll(correlatedRsOps);
+          } else {
+            correlatedReduceSinkOps.add(rsop);
+          }
+          // this if block is useful when we use "isCorrelated = !(intersection.isEmpty());" for
+          // the evaluation of isCorrelated
+          if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+              GroupByOperator.getOperatorName()) &&
+              (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) {
+            LOG.info("--found a RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns. "
+                + " The number of common keys is "
+                + intersection.size()
+                + ", and the number of keys of next group by operator"
+                + nextChildReduceSinkOperator.getConf().getKeyCols().size());
+            correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator);
+          }
+        } else {
+          LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated");
+          LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+          LOG.info("--keys of child operator: " + keyColumns.toString());
+          LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+          correlatedReduceSinkOps.clear();
+          correlation.getRSGBYToBeReplacedByGBYRSGBY().clear();
+        }
+      } else {
+        LOG.error("ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap");
+        throw new SemanticException("CorrelationOptimizer cannot optimize this plan. " +
+            "ReduceSinkOperator " + op.getIdentifier()
+            + " does not have ColumnExprMap");
+      }
+      return correlatedReduceSinkOps;
+    }
+
+    private Set<ReduceSinkOperator> exploitJFC(ReduceSinkOperator op,
+      CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation)
+      throws SemanticException {
+
+      correlationCtx.addWalked(op);
+      correlation.addToAllReduceSinkOperators(op);
+
+      Set<ReduceSinkOperator> reduceSinkOperators = new HashSet<ReduceSinkOperator>();
+
+      boolean shouldDetect = true;
+
+      List<ExprNodeDesc> keys = op.getConf().getKeyCols();
+      Set<String> keyColumns = new HashSet<String>();
+      for (ExprNodeDesc key : keys) {
+        if (!(key instanceof ExprNodeColumnDesc)) {
+          shouldDetect = false;
+        } else {
+          keyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+        }
+      }
+
+      if (shouldDetect) {
+        Set<ReduceSinkOperator> newReduceSinkOperators = new HashSet<ReduceSinkOperator>();
+        for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+          LOG.info("Operator " + op.getIdentifier()
+              + ": start detecting correlation from this operator");
+          LOG.info("--keys of this operator: " + keyColumns.toString());
+          Set<ReduceSinkOperator> correlatedReduceSinkOperators =
+              findCorrelatedReduceSinkOperators(parent, keyColumns, correlation);
+          if (correlatedReduceSinkOperators.size() == 0) {
+            newReduceSinkOperators.add(op);
+          } else {
+            for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+
+              // For two ReduceSinkOperators, we say the one closer to FileSinkOperators is up and
+              // another one is down
+
+              if (!correlation.getUp2downRSops().containsKey(op)) {
+                correlation.getUp2downRSops().put(op, new ArrayList<ReduceSinkOperator>());
+              }
+              correlation.getUp2downRSops().get(op).add(rsop);
+
+              if (!correlation.getDown2upRSops().containsKey(rsop)) {
+                correlation.getDown2upRSops().put(rsop, new ArrayList<ReduceSinkOperator>());
+              }
+              correlation.getDown2upRSops().get(rsop).add(op);
+              Set<ReduceSinkOperator> exploited = exploitJFC(rsop, correlationCtx,
+                  correlation);
+              if (exploited.size() == 0) {
+                newReduceSinkOperators.add(rsop);
+              } else {
+                newReduceSinkOperators.addAll(exploited);
+              }
+            }
+          }
+        }
+        reduceSinkOperators.addAll(newReduceSinkOperators);
+      }
+      return reduceSinkOperators;
+    }
+
+    private TableScanOperator findTableScanOPerator(Operator<? extends OperatorDesc> startPoint) {
+      Operator<? extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+      while (true) {
+        if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+          return null;
+        } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+          return (TableScanOperator) thisOp;
+        } else {
+          if (thisOp.getParentOperators() != null) {
+            thisOp = thisOp.getParentOperators().get(0);
+          } else {
+            break;
+          }
+        }
+      }
+      return null;
+    }
+
+    private void annotateOpPlan(IntraQueryCorrelation correlation) {
+      Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPath =
+          new HashMap<ReduceSinkOperator, Integer>();
+      int indx = 0;
+      for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+        if (!bottomReduceSink2OperationPath.containsKey(rsop)) {
+          bottomReduceSink2OperationPath.put(rsop, indx);
+          for (ReduceSinkOperator peerRSop : CorrelationOptimizerUtils
+              .findPeerReduceSinkOperators(rsop)) {
+            if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) {
+              bottomReduceSink2OperationPath.put(peerRSop, indx);
+            }
+          }
+          indx++;
+        }
+      }
+      correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OperationPath);
+    }
+
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+
+      CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+
+      ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+      if (correlationCtx.isWalked(op)) {
+        return null;
+      }
+
+      LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+          + ((Operator) nd).getName());
+      addOperatorInfo((Operator<? extends OperatorDesc>) nd);
+
+      if (op.getConf().getKeyCols().size() == 0 ||
+          (!op.getChildOperators().get(0).getName().equals(CommonJoinOperator.getOperatorName()) &&
+              !op.getChildOperators().get(0).getName().equals(GroupByOperator.getOperatorName()))) {
+        correlationCtx.addWalked(op);
+        return null;
+      }
+
+      // 1: find out correlation
+      IntraQueryCorrelation correlation = new IntraQueryCorrelation();
+      List<ReduceSinkOperator> peerReduceSinkOperators =
+          CorrelationOptimizerUtils.findPeerReduceSinkOperators(op);
+      List<ReduceSinkOperator> bottomReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+      for (ReduceSinkOperator rsop : peerReduceSinkOperators) {
+        Set<ReduceSinkOperator> thisBottomReduceSinkOperators = exploitJFC(rsop,
+            correlationCtx, correlation);
+        if (thisBottomReduceSinkOperators.size() == 0) {
+          thisBottomReduceSinkOperators.add(rsop);
+        } else {
+          boolean isClear = false;
+          // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+          // TableScanOperators
+          for (ReduceSinkOperator bottomRsop : thisBottomReduceSinkOperators) {
+            TableScanOperator tsop = findTableScanOPerator(bottomRsop);
+            if (tsop == null) {
+              isClear = true; // currently the optimizer can only optimize correlations involving
+              // source tables (input tables)
+            } else {
+              // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+              // FileSinkOperators
+              if (!correlation.getTop2TSops().containsKey(rsop)) {
+                correlation.getTop2TSops().put(rsop, new ArrayList<TableScanOperator>());
+              }
+              correlation.getTop2TSops().get(rsop).add(tsop);
+
+              if (!correlation.getBottom2TSops().containsKey(bottomRsop)) {
+                correlation.getBottom2TSops().put(bottomRsop, new ArrayList<TableScanOperator>());
+              }
+              correlation.getBottom2TSops().get(bottomRsop).add(tsop);
+            }
+          }
+          if (isClear) {
+            thisBottomReduceSinkOperators.clear();
+            thisBottomReduceSinkOperators.add(rsop);
+          }
+        }
+        bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+      }
+
+      if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+        LOG.info("has job flow correlation");
+        correlation.setJobFlowCorrelation(true);
+        correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators);
+        annotateOpPlan(correlation);
+      }
+
+      if (correlation.hasJobFlowCorrelation()) {
+        boolean hasICandTC = findICandTC(correlation);
+        LOG.info("has input correlation and transit correlation? " + hasICandTC);
+        correlation.setInputCorrelation(hasICandTC);
+        correlation.setTransitCorrelation(hasICandTC);
+        boolean hasSelfJoin = hasSelfJoin(correlation);
+        LOG.info("has self-join? " + hasSelfJoin);
+        correlation.setInvolveSelfJoin(hasSelfJoin);
+        // TODO: support self-join involved cases. For self-join related operation paths, after the
+        // correlation dispatch operator, each path should be filtered by a filter operator
+        if (!hasSelfJoin) {
+          LOG.info("correlation detected");
+          correlationCtx.addCorrelation(correlation);
+        } else {
+          LOG.info("correlation discarded. The current optimizer cannot optimize it");
+        }
+      }
+      correlationCtx.addWalkedAll(peerReduceSinkOperators);
+      return null;
+    }
+
+    private boolean hasSelfJoin(IntraQueryCorrelation correlation) {
+      boolean hasSelfJoin = false;
+      for (Entry<String, List<ReduceSinkOperator>> entry : correlation
+          .getTable2CorrelatedRSops().entrySet()) {
+        for (ReduceSinkOperator rsop : entry.getValue()) {
+          Set<ReduceSinkOperator> intersection = new HashSet<ReduceSinkOperator>(
+              CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop));
+          intersection.retainAll(entry.getValue());
+          // if self-join is involved
+          if (intersection.size() > 1) {
+            hasSelfJoin = true;
+            return hasSelfJoin;
+          }
+        }
+      }
+      return hasSelfJoin;
+    }
+
+    private boolean findICandTC(IntraQueryCorrelation correlation) {
+
+      boolean hasICandTC = false;
+      Map<String, List<ReduceSinkOperator>> table2RSops =
+          new HashMap<String, List<ReduceSinkOperator>>();
+      Map<String, List<TableScanOperator>> table2TSops =
+          new HashMap<String, List<TableScanOperator>>();
+
+      for (Entry<ReduceSinkOperator, List<TableScanOperator>> entry : correlation
+          .getBottom2TSops().entrySet()) {
+        String tbl = aliastoTabName.get(entry.getValue().get(0).getConf().getAlias());
+        if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) {
+          table2RSops.put(tbl, new ArrayList<ReduceSinkOperator>());
+          table2TSops.put(tbl, new ArrayList<TableScanOperator>());
+        }
+        assert entry.getValue().size() == 1;
+        table2RSops.get(tbl).add(entry.getKey());
+        table2TSops.get(tbl).add(entry.getValue().get(0));
+      }
+
+      for (Entry<String, List<ReduceSinkOperator>> entry : table2RSops.entrySet()) {
+        if (entry.getValue().size() > 1) {
+          hasICandTC = true;
+          break;
+        }
+      }
+      correlation.setICandTCCorrelation(table2RSops, table2TSops);
+      return hasICandTC;
+    }
+  }
+
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+        LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+            + ((Operator) nd).getName() + ". No actual work to do");
+        CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+        if (op.getName().equals(MapJoinOperator.getOperatorName())) {
+          correlationCtx.setAbort(true);
+          correlationCtx.getAbortReasons().add("Found MAPJOIN");
+        }
+        addOperatorInfo((Operator<? extends OperatorDesc>) nd);
+        return null;
+      }
+    };
+  }
+
+  public class IntraQueryCorrelation {
+
+    private final Map<ReduceSinkOperator, List<ReduceSinkOperator>> down2upRSops =
+        new HashMap<ReduceSinkOperator, List<ReduceSinkOperator>>();
+    private final Map<ReduceSinkOperator, List<ReduceSinkOperator>> up2downRSops =
+        new HashMap<ReduceSinkOperator, List<ReduceSinkOperator>>();
+
+    private final Map<ReduceSinkOperator, List<TableScanOperator>> top2TSops =
+        new HashMap<ReduceSinkOperator, List<TableScanOperator>>();
+    private final Map<ReduceSinkOperator, List<TableScanOperator>> bottom2TSops =
+        new HashMap<ReduceSinkOperator, List<TableScanOperator>>();
+
+    private List<ReduceSinkOperator> topReduceSinkOperators;
+    private List<ReduceSinkOperator> bottomReduceSinkOperators;
+
+    private Map<String, List<ReduceSinkOperator>> table2CorrelatedRSops;
+
+    private Map<String, List<TableScanOperator>> table2CorrelatedTSops;
+
+    private Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPathMap;
+
+    private final Map<Integer, Map<Integer, List<Integer>>> dispatchConf =
+        new HashMap<Integer, Map<Integer, List<Integer>>>(); // inputTag->(Child->outputTag)
+    private final Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf =
+        new HashMap<Integer, Map<Integer, List<SelectDesc>>>(); // inputTag->(Child->SelectDesc)
+    private final Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf =
+        new HashMap<Integer, Map<Integer, List<SelectDesc>>>(); // inputTag->(Child->SelectDesc)
+
+    private final Set<ReduceSinkOperator> allReduceSinkOperators =
+        new HashSet<ReduceSinkOperator>();
+
+    // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by
+    // GroupBy-ReduceSink-GroupBy pattern.
+    // the type of first GroupByOperator is hash type and this one will be used to group records.
+    private final Set<ReduceSinkOperator> rSGBYToBeReplacedByGBYRSGBY =
+        new HashSet<ReduceSinkOperator>();
+
+    public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) {
+      rSGBYToBeReplacedByGBYRSGBY.add(rsop);
+    }
+
+    public Set<ReduceSinkOperator> getRSGBYToBeReplacedByGBYRSGBY() {
+      return rSGBYToBeReplacedByGBYRSGBY;
+    }
+
+    public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+      allReduceSinkOperators.add(rsop);
+    }
+
+    public Set<ReduceSinkOperator> getAllReduceSinkOperators() {
+      return allReduceSinkOperators;
+    }
+
+    public Map<Integer, Map<Integer, List<Integer>>> getDispatchConf() {
+      return dispatchConf;
+    }
+
+    public Map<Integer, Map<Integer, List<SelectDesc>>> getDispatchValueSelectDescConf() {
+      return dispatchValueSelectDescConf;
+    }
+
+    public Map<Integer, Map<Integer, List<SelectDesc>>> getDispatchKeySelectDescConf() {
+      return dispatchKeySelectDescConf;
+    }
+
+    public void addOperationPathToDispatchConf(Integer opPlan) {
+      if (!dispatchConf.containsKey(opPlan)) {
+        dispatchConf.put(opPlan, new HashMap<Integer, List<Integer>>());
+      }
+    }
+
+    public Map<Integer, List<Integer>> getDispatchConfForOperationPath(Integer opPlan) {
+      return dispatchConf.get(opPlan);
+    }
+
+    public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) {
+      if (!dispatchValueSelectDescConf.containsKey(opPlan)) {
+        dispatchValueSelectDescConf.put(opPlan, new HashMap<Integer, List<SelectDesc>>());
+      }
+    }
+
+    public Map<Integer, List<SelectDesc>> getDispatchValueSelectDescConfForOperationPath(
+        Integer opPlan) {
+      return dispatchValueSelectDescConf.get(opPlan);
+    }
+
+    public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) {
+      if (!dispatchKeySelectDescConf.containsKey(opPlan)) {
+        dispatchKeySelectDescConf.put(opPlan, new HashMap<Integer, List<SelectDesc>>());
+      }
+    }
+
+    public Map<Integer, List<SelectDesc>> getDispatchKeySelectDescConfForOperationPath(
+        Integer opPlan) {
+      return dispatchKeySelectDescConf.get(opPlan);
+    }
+
+    private boolean inputCorrelation = false;
+    private boolean transitCorrelation = false;
+    private boolean jobFlowCorrelation = false;
+
+    public void setBottomReduceSink2OperationPathMap(
+        Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPathMap) {
+      this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap;
+    }
+
+    public Map<ReduceSinkOperator, Integer> getBottomReduceSink2OperationPathMap() {
+      return bottomReduceSink2OperationPathMap;
+    }
+
+    public void setInputCorrelation(boolean inputCorrelation) {
+      this.inputCorrelation = inputCorrelation;
+    }
+
+    public boolean hasInputCorrelation() {
+      return inputCorrelation;
+    }
+
+    public void setTransitCorrelation(boolean transitCorrelation) {
+      this.transitCorrelation = transitCorrelation;
+    }
+
+    public boolean hasTransitCorrelation() {
+      return transitCorrelation;
+    }
+
+    public void setJobFlowCorrelation(boolean jobFlowCorrelation) {
+      this.jobFlowCorrelation = jobFlowCorrelation;
+    }
+
+    public boolean hasJobFlowCorrelation() {
+      return jobFlowCorrelation;
+    }
+
+    public Map<ReduceSinkOperator, List<TableScanOperator>> getTop2TSops() {
+      return top2TSops;
+    }
+
+    public Map<ReduceSinkOperator, List<TableScanOperator>> getBottom2TSops() {
+      return bottom2TSops;
+    }
+
+    public Map<ReduceSinkOperator, List<ReduceSinkOperator>> getDown2upRSops() {
+      return down2upRSops;
+    }
+
+    public Map<ReduceSinkOperator, List<ReduceSinkOperator>> getUp2downRSops() {
+      return up2downRSops;
+    }
+
+    public void setJFCCorrelation(List<ReduceSinkOperator> peerReduceSinkOperators,
+        List<ReduceSinkOperator> bottomReduceSinkOperators) {
+      this.topReduceSinkOperators = peerReduceSinkOperators;
+      this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+    }
+
+
+    public List<ReduceSinkOperator> getTopReduceSinkOperators() {
+      return topReduceSinkOperators;
+    }
+
+    public List<ReduceSinkOperator> getBottomReduceSinkOperators() {
+      return bottomReduceSinkOperators;
+    }
+
+    public void setICandTCCorrelation(Map<String, List<ReduceSinkOperator>> table2RSops,
+        Map<String, List<TableScanOperator>> table2TSops) {
+      this.table2CorrelatedRSops = table2RSops;
+      this.table2CorrelatedTSops = table2TSops;
+    }
+
+    public Map<String, List<ReduceSinkOperator>> getTable2CorrelatedRSops() {
+      return table2CorrelatedRSops;
+    }
+
+    public Map<String, List<TableScanOperator>> getTable2CorrelatedTSops() {
+      return table2CorrelatedTSops;
+    }
+
+    private boolean isInvolveSelfJoin = false;
+
+    public boolean isInvolveSelfJoin() {
+      return isInvolveSelfJoin;
+    }
+
+    public void setInvolveSelfJoin(boolean isInvolveSelfJoin) {
+      this.isInvolveSelfJoin = isInvolveSelfJoin;
+    }
+
+  }
+
+  private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx {
+    public int fileSinkOperatorCount = 0;
+  }
+
+  private class CorrelationNodeProcCtx implements NodeProcessorCtx {
+
+    private boolean abort;
+
+    private final List<String> abortReasons;
+
+    private final Set<ReduceSinkOperator> walked;
+
+    private final List<IntraQueryCorrelation> correlations;
+
+    public CorrelationNodeProcCtx() {
+      walked = new HashSet<ReduceSinkOperator>();
+      correlations = new ArrayList<IntraQueryCorrelation>();
+      abort = false;
+      abortReasons = new ArrayList<String>();
+    }
+
+    public void setAbort(boolean abort) {
+      this.abort = abort;
+    }
+
+    public boolean isAbort() {
+      return abort;
+    }
+
+    public List<String> getAbortReasons() {
+      return abortReasons;
+    }
+
+    public void addCorrelation(IntraQueryCorrelation correlation) {
+      correlations.add(correlation);
+    }
+
+    public List<IntraQueryCorrelation> getCorrelations() {
+      return correlations;
+    }
+
+    public boolean isWalked(ReduceSinkOperator op) {
+      return walked.contains(op);
+    }
+
+    public void addWalked(ReduceSinkOperator op) {
+      walked.add(op);
+    }
+
+    public void addWalkedAll(Collection<ReduceSinkOperator> c) {
+      walked.addAll(c);
+    }
+
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,801 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator;
+import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.ForwardDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+
+public final class CorrelationOptimizerUtils {
+
+  static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName());
+
+  public static boolean isExisted(ExprNodeDesc expr, List<ExprNodeDesc> col_list) {
+    for (ExprNodeDesc thisExpr : col_list) {
+      if (expr.getExprString().equals(thisExpr.getExprString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static String getColumnName(Map<String, ExprNodeDesc> opColumnExprMap, ExprNodeDesc expr) {
+    for (Entry<String, ExprNodeDesc> entry : opColumnExprMap.entrySet()) {
+      if (expr.getExprString().equals(entry.getValue().getExprString())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+
+  public static Operator<? extends OperatorDesc> unionUsedColumnsAndMakeNewSelect(
+      List<ReduceSinkOperator> rsops,
+      IntraQueryCorrelation correlation, Map<Operator<? extends OperatorDesc>,
+      Map<String, ExprNodeDesc>> originalOpColumnExprMap, TableScanOperator input,
+      ParseContext pGraphContext,
+      Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx) {
+
+    ArrayList<String> columnNames = new ArrayList<String>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> col_list = new ArrayList<ExprNodeDesc>();
+    RowResolver out_rwsch = new RowResolver();
+    boolean isSelectAll = false;
+
+    int pos = 0;
+    for (ReduceSinkOperator rsop : rsops) {
+      Operator<? extends OperatorDesc> curr = correlation.getBottom2TSops().get(rsop).get(0)
+          .getChildOperators().get(0);
+      while (true) {
+        if (curr.getName().equals(SelectOperator.getOperatorName())) {
+          SelectOperator selOp = (SelectOperator) curr;
+          if (selOp.getColumnExprMap() != null) {
+            for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
+              ExprNodeDesc expr = entry.getValue();
+              if (!isExisted(expr, col_list)
+                  && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+                  .containsKey(entry.getKey())) {
+                col_list.add(expr);
+                String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+                    .get(entry.getKey());
+                String tabAlias = colRef[0];
+                String colAlias = colRef[1];
+                String outputName = entry.getKey();
+                out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+                    outputName, expr.getTypeInfo(), tabAlias, false));
+                pos++;
+                columnNames.add(outputName);
+                colExprMap.put(outputName, expr);
+              }
+            }
+          } else {
+            for (ExprNodeDesc expr : selOp.getConf().getColList()) {
+              if (!isExisted(expr, col_list)) {
+                col_list.add(expr);
+                String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver()
+                    .getInvRslvMap().get(expr.getCols().get(0));
+                String tabAlias = colRef[0];
+                String colAlias = colRef[1];
+                String outputName = expr.getCols().get(0);
+                out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+                    outputName, expr.getTypeInfo(), tabAlias, false));
+                columnNames.add(outputName);
+                colExprMap.put(outputName, expr);
+                pos++;
+              }
+            }
+          }
+          break;
+        } else if (curr.getName().equals(FilterOperator.getOperatorName())) {
+          isSelectAll = true;
+          break;
+        } else if (curr.getName().equals(ReduceSinkOperator.getOperatorName())) {
+          ReduceSinkOperator thisRSop = (ReduceSinkOperator) curr;
+          for (ExprNodeDesc expr : thisRSop.getConf().getKeyCols()) {
+            if (!isExisted(expr, col_list)) {
+              col_list.add(expr);
+              assert expr.getCols().size() == 1;
+              String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+              String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+                  .getInvRslvMap().get(columnName);
+              String tabAlias = colRef[0];
+              String colAlias = colRef[1];
+              String outputName = expr.getCols().get(0);
+              out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+                  outputName, expr.getTypeInfo(), tabAlias, false));
+              columnNames.add(outputName);
+              colExprMap.put(outputName, expr);
+              pos++;
+            }
+          }
+          for (ExprNodeDesc expr : thisRSop.getConf().getValueCols()) {
+            if (!isExisted(expr, col_list)) {
+              col_list.add(expr);
+              assert expr.getCols().size() == 1;
+              String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+              String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+                  .getInvRslvMap().get(columnName);
+              String tabAlias = colRef[0];
+              String colAlias = colRef[1];
+              String outputName = expr.getCols().get(0);
+              out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+                  outputName, expr.getTypeInfo(), tabAlias, false));
+              columnNames.add(outputName);
+              colExprMap.put(outputName, expr);
+              pos++;
+            }
+          }
+
+          break;
+        } else {
+          curr = curr.getChildOperators().get(0);
+        }
+      }
+    }
+
+    Operator<? extends OperatorDesc> output;
+    if (isSelectAll) {
+      output = input;
+    } else {
+      output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+          new SelectDesc(col_list, columnNames, false), new RowSchema(
+              out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx());
+      output.setColumnExprMap(colExprMap);
+      output.setChildOperators(Utilities.makeList());
+
+    }
+
+    return output;
+  }
+
+
+  public static Operator<? extends OperatorDesc> putOpInsertMap(
+      Operator<? extends OperatorDesc> op,
+      RowResolver rr, LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {
+    OpParseContext ctx = new OpParseContext(rr);
+    opParseCtx.put(op, ctx);
+    op.augmentPlan();
+    return op;
+  }
+
+  public static Map<Operator<? extends OperatorDesc>, String> getAliasIDtTopOps(
+      Map<String, Operator<? extends OperatorDesc>> topOps) {
+    Map<Operator<? extends OperatorDesc>, String> aliasIDtTopOps =
+        new HashMap<Operator<? extends OperatorDesc>, String>();
+    for (Entry<String, Operator<? extends OperatorDesc>> entry : topOps.entrySet()) {
+      assert !aliasIDtTopOps.containsKey(entry.getValue());
+      aliasIDtTopOps.put(entry.getValue(), entry.getKey());
+    }
+    return aliasIDtTopOps;
+  }
+
+  /**
+   * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op
+   * included).
+   */
+  public static List<ReduceSinkOperator> findPeerReduceSinkOperators(ReduceSinkOperator op) {
+    List<ReduceSinkOperator> peerReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+    List<Operator<? extends OperatorDesc>> children = op.getChildOperators();
+    assert children.size() == 1; // A ReduceSinkOperator should have only one child
+    for (Operator<? extends OperatorDesc> parent : children.get(0).getParentOperators()) {
+      assert (parent instanceof ReduceSinkOperator);
+      peerReduceSinkOperators.add((ReduceSinkOperator) parent);
+    }
+    return peerReduceSinkOperators;
+  }
+
+  public static List<CorrelationLocalSimulativeReduceSinkOperator> findPeerFakeReduceSinkOperators(
+      CorrelationLocalSimulativeReduceSinkOperator op) {
+
+    List<CorrelationLocalSimulativeReduceSinkOperator> peerReduceSinkOperators =
+        new ArrayList<CorrelationLocalSimulativeReduceSinkOperator>();
+
+    List<Operator<? extends OperatorDesc>> children = op.getChildOperators();
+    assert children.size() == 1;
+
+    for (Operator<? extends OperatorDesc> parent : children.get(0).getParentOperators()) {
+      assert (parent instanceof ReduceSinkOperator);
+      peerReduceSinkOperators.add((CorrelationLocalSimulativeReduceSinkOperator) parent);
+    }
+
+    return peerReduceSinkOperators;
+  }
+
+  public static boolean applyCorrelation(
+      IntraQueryCorrelation correlation,
+      ParseContext inputpGraphContext,
+      Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap,
+      Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver,
+      Map<ReduceSinkOperator, GroupByOperator> groupbyRegular2MapSide,
+      Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx)
+          throws SemanticException {
+
+    ParseContext pGraphContext = inputpGraphContext;
+
+    // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the type of the first GBY is
+    // hash, so it can group records
+    LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY");
+    for (ReduceSinkOperator rsop : correlation.getRSGBYToBeReplacedByGBYRSGBY()) {
+      LOG.info("operator " + rsop.getIdentifier() + " should be replaced");
+      assert !correlation.getBottomReduceSinkOperators().contains(rsop);
+      GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop);
+      assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator);
+      ReduceSinkOperator newRsop = (ReduceSinkOperator) mapSideGBY.getChildOperators().get(0);
+      GroupByOperator reduceSideGBY = (GroupByOperator) newRsop.getChildOperators().get(0);
+      GroupByOperator oldReduceSideGBY = (GroupByOperator) rsop.getChildOperators().get(0);
+      List<Operator<? extends OperatorDesc>> parents = rsop.getParentOperators();
+      List<Operator<? extends OperatorDesc>> children = oldReduceSideGBY.getChildOperators();
+      mapSideGBY.setParentOperators(parents);
+      for (Operator<? extends OperatorDesc> parent : parents) {
+        parent.replaceChild(rsop, mapSideGBY);
+      }
+      reduceSideGBY.setChildOperators(children);
+      for (Operator<? extends OperatorDesc> child : children) {
+        child.replaceParent(oldReduceSideGBY, reduceSideGBY);
+      }
+      correlation.getAllReduceSinkOperators().remove(rsop);
+      correlation.getAllReduceSinkOperators().add(newRsop);
+    }
+
+
+    Operator<? extends OperatorDesc> curr;
+
+    // 1: Create table scan operator
+    LOG.info("apply correlation step 1: create table scan operator");
+    Map<TableScanOperator, TableScanOperator> oldTSOP2newTSOP =
+        new HashMap<TableScanOperator, TableScanOperator>();
+    Map<String, Operator<? extends OperatorDesc>> oldTopOps = pGraphContext.getTopOps();
+    Map<Operator<? extends OperatorDesc>, String> oldAliasIDtTopOps =
+        getAliasIDtTopOps(oldTopOps);
+    Map<TableScanOperator, Table> oldTopToTable = pGraphContext.getTopToTable();
+    Map<String, Operator<? extends OperatorDesc>> addedTopOps =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+    Map<TableScanOperator, Table> addedTopToTable = new HashMap<TableScanOperator, Table>();
+    for (Entry<String, List<TableScanOperator>> entry : correlation.getTable2CorrelatedTSops()
+        .entrySet()) {
+      TableScanOperator oldTSop = entry.getValue().get(0);
+      TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf()
+          .getVirtualCols());
+      tsDesc.setForwardRowNumber(true);
+      OpParseContext opParseCtx = pGraphContext.getOpParseCtx().get(oldTSop);
+      Operator<? extends OperatorDesc> top = putOpInsertMap(OperatorFactory.get(tsDesc,
+          new RowSchema(opParseCtx.getRowResolver().getColumnInfos())),
+          opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx());
+      top.setParentOperators(null);
+      top.setChildOperators(Utilities.makeList());
+      for (TableScanOperator tsop : entry.getValue()) {
+        addedTopOps.put(oldAliasIDtTopOps.get(tsop), top);
+        addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop));
+        oldTSOP2newTSOP.put(tsop, (TableScanOperator) top);
+      }
+    }
+
+    List<Operator<? extends OperatorDesc>> childrenOfDispatch =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+      // TODO: currently, correlation optimizer can not handle the case that
+      // a table is directly connected to a post computation operator. e.g.
+      //   Join
+      //  /    \
+      // GBY   T2
+      // |
+      // T1
+      if (!correlation.getBottomReduceSinkOperators()
+          .containsAll(findPeerReduceSinkOperators(rsop))) {
+        LOG.info("Can not handle the case that " +
+            "a table is directly connected to a post computation operator. Use original plan");
+        return false;
+      }
+      Operator<? extends OperatorDesc> op = rsop.getChildOperators().get(0);
+      if (!childrenOfDispatch.contains(op)) {
+        LOG.info("Add :" + op.getIdentifier() + " " + op.getName()
+            + " to the children list of dispatch operator");
+        childrenOfDispatch.add(op);
+      }
+    }
+
+    int opTag = 0;
+    Map<Integer, ReduceSinkOperator> operationPath2CorrelationReduceSinkOps =
+        new HashMap<Integer, ReduceSinkOperator>();
+    for (Entry<String, List<ReduceSinkOperator>> entry : correlation
+        .getTable2CorrelatedRSops().entrySet()) {
+
+      // 2: Create select operator for shared operation paths
+      LOG.info("apply correlation step 2: create select operator for shared operation path for the table of "
+          + entry.getKey());
+      curr =
+          unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap,
+              oldTSOP2newTSOP
+              .get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)),
+              pGraphContext, originalOpParseCtx);
+
+      // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator
+      LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of "
+          + entry.getKey());
+      curr =
+          createCorrelationCompositeReducesinkOperaotr(
+              correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(),
+              correlation, curr, pGraphContext,
+              childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag,
+              originalOpRowResolver);
+
+      operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator) curr);
+      opTag++;
+    }
+
+
+    // 4: Create correlation dispatch operator for operation paths
+    LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths");
+    RowResolver outputRS = new RowResolver();
+    List<Operator<? extends OperatorDesc>> correlationReduceSinkOps =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    for (Entry<Integer, ReduceSinkOperator> entry : operationPath2CorrelationReduceSinkOps
+        .entrySet()) {
+      curr = entry.getValue();
+      correlationReduceSinkOps.add(curr);
+      RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver();
+      for (Entry<String, LinkedHashMap<String, ColumnInfo>> e1 : inputRS.getRslvMap().entrySet()) {
+        for (Entry<String, ColumnInfo> e2 : e1.getValue().entrySet()) {
+          outputRS.put(e1.getKey(), e2.getKey(), e2.getValue());
+        }
+      }
+    }
+
+    Operator<? extends OperatorDesc> dispatchOp = putOpInsertMap(OperatorFactory.get(
+        new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation
+            .getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()),
+            new RowSchema(outputRS.getColumnInfos())),
+            outputRS, pGraphContext.getOpParseCtx());
+
+    dispatchOp.setParentOperators(correlationReduceSinkOps);
+    for (Operator<? extends OperatorDesc> thisOp : correlationReduceSinkOps) {
+      thisOp.setChildOperators(Utilities.makeList(dispatchOp));
+    }
+
+    // 5: Replace the old plan in the original plan tree with new plan
+    LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan");
+    Set<Operator<? extends OperatorDesc>> processed =
+        new HashSet<Operator<? extends OperatorDesc>>();
+    for (Operator<? extends OperatorDesc> op : childrenOfDispatch) {
+      List<Operator<? extends OperatorDesc>> parents =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      for (Operator<? extends OperatorDesc> oldParent : op.getParentOperators()) {
+        if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) {
+          parents.add(oldParent);
+        }
+      }
+      parents.add(dispatchOp);
+      op.setParentOperators(parents);
+    }
+    dispatchOp.setChildOperators(childrenOfDispatch);
+    HashMap<String, Operator<? extends OperatorDesc>> newTopOps =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+    for (Entry<String, Operator<? extends OperatorDesc>> entry : oldTopOps.entrySet()) {
+      if (addedTopOps.containsKey(entry.getKey())) {
+        newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey()));
+      } else {
+        newTopOps.put(entry.getKey(), entry.getValue());
+      }
+    }
+    pGraphContext.setTopOps(newTopOps);
+    HashMap<TableScanOperator, Table> newTopToTable = new HashMap<TableScanOperator, Table>();
+    for (Entry<TableScanOperator, Table> entry : oldTopToTable.entrySet()) {
+      if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) {
+        newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()),
+            addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey())));
+      } else {
+        newTopToTable.put(entry.getKey(), entry.getValue());
+      }
+    }
+    pGraphContext.setTopToTable(newTopToTable);
+
+    // 6: Change every JFC related ReduceSinkOperator to a
+    // CorrelationLocalSimulativeReduceSinkOperator
+    LOG.info("apply correlation step 6: Change every JFC related reduce sink operator to a " +
+        "CorrelationLocalSimulativeReduceSinkOperator");
+    for (ReduceSinkOperator rsop : correlation.getAllReduceSinkOperators()) {
+      if (!correlation.getBottomReduceSinkOperators().contains(rsop)) {
+        Operator<? extends OperatorDesc> childOP = rsop.getChildOperators().get(0);
+        Operator<? extends OperatorDesc> parentOP = rsop.getParentOperators().get(0);
+        Operator<? extends OperatorDesc> correlationLocalSimulativeReduceSinkOperator =
+            putOpInsertMap(
+                OperatorFactory.get(
+                    new CorrelationLocalSimulativeReduceSinkDesc(rsop.getConf()),
+                    new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver()
+                        .getColumnInfos())),
+                        pGraphContext.getOpParseCtx().get(rsop).getRowResolver(),
+                        pGraphContext.getOpParseCtx());
+        correlationLocalSimulativeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP));
+        correlationLocalSimulativeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP));
+        parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop),
+            correlationLocalSimulativeReduceSinkOperator);
+        childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop),
+            correlationLocalSimulativeReduceSinkOperator);
+      }
+    }
+    return true;
+  }
+
+  public static Operator<? extends OperatorDesc> createCorrelationCompositeReducesinkOperaotr(
+      List<TableScanOperator> tsops,
+      List<ReduceSinkOperator> rsops,
+      IntraQueryCorrelation correlation,
+      Operator<? extends OperatorDesc> input,
+      ParseContext pGraphContext,
+      List<Operator<? extends OperatorDesc>> childrenOfDispatch,
+      String tableName,
+      Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap,
+      int newTag,
+      Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver)
+          throws SemanticException {
+
+    // Create CorrelationCompositeOperator
+    RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+    List<Operator<? extends OperatorDesc>> tops =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    List<Operator<? extends OperatorDesc>> bottoms =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    List<Integer> opTags = new ArrayList<Integer>();
+
+    for (ReduceSinkOperator rsop : rsops) {
+      TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0);
+      Operator<? extends OperatorDesc> curr = tsop.getChildOperators().get(0);
+      if (curr == rsop) {
+        // no filter needed, just forward
+        ForwardDesc forwardCtx = new ForwardDesc();
+        Operator<ForwardDesc> forwardOp = OperatorFactory.get(ForwardDesc.class);
+        forwardOp.setConf(forwardCtx);
+        tops.add(forwardOp);
+        bottoms.add(forwardOp);
+        opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+      } else {
+        // Add filter operator
+        FilterOperator currFilOp = null;
+        while (curr != rsop) {
+          if (curr.getName().equals("FIL")) {
+            FilterOperator fil = (FilterOperator) curr;
+            FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false);
+            Operator<FilterDesc> nowFilOp = OperatorFactory.get(FilterDesc.class);
+            nowFilOp.setConf(filterCtx);
+            if (currFilOp == null) {
+              currFilOp = (FilterOperator) nowFilOp;
+              tops.add(currFilOp);
+            } else {
+              nowFilOp.setParentOperators(Utilities.makeList(currFilOp));
+              currFilOp.setChildOperators(Utilities.makeList(nowFilOp));
+              currFilOp = (FilterOperator) nowFilOp;
+            }
+          }
+          curr = curr.getChildOperators().get(0);
+        }
+        if (currFilOp == null) {
+          ForwardDesc forwardCtx = new ForwardDesc();
+          Operator<ForwardDesc> forwardOp = OperatorFactory.get(ForwardDesc.class);
+          forwardOp.setConf(forwardCtx);
+          tops.add(forwardOp);
+          bottoms.add(forwardOp);
+        } else {
+          bottoms.add(currFilOp);
+        }
+        opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+
+      }
+    }
+
+    int[] opTagsArray = new int[opTags.size()];
+    for (int i = 0; i < opTags.size(); i++) {
+      opTagsArray[i] = opTags.get(i).intValue();
+    }
+
+    for (Operator<? extends OperatorDesc> op : bottoms) {
+      op.setParentOperators(Utilities.makeList(input));
+    }
+    input.setChildOperators(bottoms);
+
+    CorrelationCompositeDesc ycoCtx = new CorrelationCompositeDesc();
+    ycoCtx.setAllOperationPathTags(opTagsArray);
+
+    Operator<? extends OperatorDesc> ycop = putOpInsertMap(OperatorFactory.get(ycoCtx,
+        new RowSchema(inputRR.getColumnInfos())),
+        inputRR, pGraphContext.getOpParseCtx());
+    ycop.setParentOperators(tops);
+    for (Operator<? extends OperatorDesc> op : tops) {
+      op.setChildOperators(Utilities.makeList(ycop));
+    }
+
+    // Create CorrelationReduceSinkOperator
+    ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    ArrayList<String> keyOutputColumnNames = new ArrayList<String>();
+    ReduceSinkOperator firstRsop = rsops.get(0);
+
+    RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver();
+    RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop);
+    RowResolver outputRS = new RowResolver();
+    Map<String, ExprNodeColumnDesc> keyCol2ExprForDispatch =
+        new HashMap<String, ExprNodeColumnDesc>();
+    Map<String, ExprNodeColumnDesc> valueCol2ExprForDispatch =
+        new HashMap<String, ExprNodeColumnDesc>();
+
+    for (ExprNodeDesc expr : firstRsop.getConf().getKeyCols()) {
+      assert expr instanceof ExprNodeColumnDesc;
+      ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr;
+      String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr);
+      ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(
+          orginalFirstRsopRS.getPosition(ouputName));
+
+      String col = SemanticAnalyzer.getColumnInternalName(keyCols.size());
+      keyOutputColumnNames.add(col);
+      ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+          .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+
+      colExprMap.put(newColInfo.getInternalName(), expr);
+
+      outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+      keyCols.add(expr);
+
+      keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col,
+          tableName,
+          encd.getIsPartitionColOrVirtualCol()));
+
+    }
+
+    ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<String> valueOutputColumnNames = new ArrayList<String>();
+
+    correlation.addOperationPathToDispatchConf(newTag);
+    correlation.addOperationPathToDispatchKeySelectDescConf(newTag);
+    correlation.addOperationPathToDispatchValueSelectDescConf(newTag);
+
+
+    for (ReduceSinkOperator rsop : rsops) {
+      LOG.debug("Analyzing ReduceSinkOperator " + rsop.getIdentifier());
+      RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver();
+      RowResolver orginalRS = originalOpRowResolver.get(rsop);
+      Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0));
+      int outputTag = rsop.getConf().getTag();
+      if (outputTag == -1) {
+        outputTag = 0;
+      }
+      if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) {
+        correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex,
+            new ArrayList<Integer>());
+      }
+      correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag);
+
+      ArrayList<ExprNodeDesc> thisKeyColsInDispatch = new ArrayList<ExprNodeDesc>();
+      ArrayList<String> outputKeyNamesInDispatch = new ArrayList<String>();
+      for (ExprNodeDesc expr : rsop.getConf().getKeyCols()) {
+        assert expr instanceof ExprNodeColumnDesc;
+        ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr;
+        String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+        LOG.debug("key column: " + outputName);
+        thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn()));
+        String[] names = outputName.split("\\.");
+        String outputKeyName = "";
+        switch (names.length) {
+        case 1:
+          outputKeyName = names[0];
+          break;
+        case 2:
+          outputKeyName = names[1];
+          break;
+        default:
+          throw (new SemanticException("found a un-sopported internal key name structure"));
+        }
+        outputKeyNamesInDispatch.add(outputKeyName);
+      }
+
+      if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(
+          childOpIndex)) {
+        correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex,
+            new ArrayList<SelectDesc>());
+      }
+      correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex).
+      add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false));
+
+      ArrayList<ExprNodeDesc> thisValueColsInDispatch = new ArrayList<ExprNodeDesc>();
+      ArrayList<String> outputValueNamesInDispatch = new ArrayList<String>();
+      for (ExprNodeDesc expr : rsop.getConf().getValueCols()) {
+
+        String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+        LOG.debug("value column: " + outputName);
+        LOG.debug("originalOpColumnExprMap.get(rsop):" + originalOpColumnExprMap.get(rsop) +
+            " expr:" + expr.toString() +
+            " orginalRS.getColumnInfos().toString:" + orginalRS.getColumnInfos().toString() + " "
+            + outputName);
+        ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName));
+        if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) {
+
+          String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size());
+          valueOutputColumnNames.add(col);
+          ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+              .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+          colExprMap.put(newColInfo.getInternalName(), expr);
+          outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+          valueCols.add(expr);
+
+          valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(
+              cinfo.getType(), col, tableName,
+              false));
+        }
+
+        thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString()));
+        String[] names = outputName.split("\\.");
+        String outputValueName = "";
+        switch (names.length) {
+        case 1:
+          outputValueName = names[0];
+          break;
+        case 2:
+          outputValueName = names[1];
+          break;
+        default:
+          throw (new SemanticException("found a un-sopported internal value name structure"));
+        }
+        outputValueNamesInDispatch.add(outputValueName);
+      }
+
+      if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(
+          childOpIndex)) {
+        correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex,
+            new ArrayList<SelectDesc>());
+      }
+      correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex).
+      add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false));
+    }
+
+    ReduceSinkOperator rsOp = null;
+    rsOp = (ReduceSinkOperator) putOpInsertMap(
+        OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols,
+            keyCols.size(), valueCols, new ArrayList<List<Integer>>(),
+            keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(),
+            -1), new RowSchema(outputRS
+                .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx());
+    rsOp.setColumnExprMap(colExprMap);
+    ((CorrelationCompositeOperator) ycop).getConf().setCorrespondingReduceSinkOperator(rsOp);
+
+    return rsOp;
+  }
+
+
+  /**
+   * Generate reduce sink descriptor.
+   *
+   * @param keyCols
+   *          The columns to be stored in the key
+   * @param numKeys
+   *          number of distribution keys. Equals to group-by-key
+   *          numbers usually.
+   * @param valueCols
+   *          The columns to be stored in the value
+   * @param distinctColIndices
+   *          column indices for distinct aggregates
+   * @param outputKeyColumnNames
+   *          The output key columns names
+   * @param outputValueColumnNames
+   *          The output value columns names
+   * @param tag
+   *          The tag for this ReduceSinkOperator
+   * @param numPartitionFields
+   *          The first numPartitionFields of keyCols will be partition columns.
+   *          If numPartitionFields=-1, then partition randomly.
+   * @param numReducers
+   *          The number of reducers, set to -1 for automatic inference based on
+   *          input data size.
+   * @return ReduceSinkDesc.
+   */
+  public static ReduceSinkDesc getReduceSinkDesc(
+      ArrayList<ExprNodeDesc> keyCols, int numKeys,
+      ArrayList<ExprNodeDesc> valueCols,
+      List<List<Integer>> distinctColIndices,
+      ArrayList<String> outputKeyColumnNames, ArrayList<String> outputValueColumnNames,
+      boolean includeKey, int tag,
+      int numPartitionFields, int numReducers) throws SemanticException {
+    ArrayList<ExprNodeDesc> partitionCols = null;
+
+    if (numPartitionFields >= keyCols.size()) {
+      partitionCols = keyCols;
+    } else if (numPartitionFields >= 0) {
+      partitionCols = new ArrayList<ExprNodeDesc>(numPartitionFields);
+      for (int i = 0; i < numPartitionFields; i++) {
+        partitionCols.add(keyCols.get(i));
+      }
+    } else {
+      // numPartitionFields = -1 means random partitioning
+      partitionCols = new ArrayList<ExprNodeDesc>(1);
+      partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor
+          .getFuncExprNodeDesc("rand"));
+    }
+
+    StringBuilder order = new StringBuilder();
+    for (int i = 0; i < keyCols.size(); i++) {
+      order.append("+");
+    }
+
+    TableDesc keyTable = null;
+    TableDesc valueTable = null;
+    ArrayList<String> outputKeyCols = new ArrayList<String>();
+    ArrayList<String> outputValCols = new ArrayList<String>();
+    if (includeKey) {
+      keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength(
+          keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""),
+          order.toString());
+      outputKeyCols.addAll(outputKeyColumnNames);
+    } else {
+      keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+          keyCols, "reducesinkkey"), order.toString());
+      for (int i = 0; i < keyCols.size(); i++) {
+        outputKeyCols.add("reducesinkkey" + i);
+      }
+    }
+    valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+        valueCols, outputValueColumnNames, 0, ""));
+    outputValCols.addAll(outputValueColumnNames);
+
+    return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
+        distinctColIndices, outputValCols,
+        tag, partitionCols, numReducers, keyTable,
+        valueTable, true);
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sun Sep 30 20:41:01 2012
@@ -116,6 +116,11 @@ public final class GenMapRedUtils {
     }
     if (reducer.getClass() == JoinOperator.class) {
       plan.setNeedsTagging(true);
+      plan.setNeedsOperationPathTagging(false);
+    }
+    if (op.getConf().getNeedsOperationPathTagging()) {
+      plan.setNeedsTagging(true);
+      plan.setNeedsOperationPathTagging(true);
     }
 
     assert currTopOp != null;
@@ -182,6 +187,7 @@ public final class GenMapRedUtils {
         opTaskMap.put(reducer, currTask);
         if (reducer.getClass() == JoinOperator.class) {
           plan.setNeedsTagging(true);
+          plan.setNeedsOperationPathTagging(false);
         }
         ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
         plan.setNumReduceTasks(desc.getNumReducers());
@@ -316,6 +322,7 @@ public final class GenMapRedUtils {
 
     if (reducer.getClass() == JoinOperator.class) {
       plan.setNeedsTagging(true);
+      plan.setNeedsOperationPathTagging(false);
     }
 
     initUnionPlan(opProcCtx, unionTask, false);
@@ -1066,6 +1073,7 @@ public final class GenMapRedUtils {
       // dependent on the redTask
       if (reducer.getClass() == JoinOperator.class) {
         cplan.setNeedsTagging(true);
+        cplan.setNeedsOperationPathTagging(false);
       }
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Sun Sep 30 20:41:01 2012
@@ -46,6 +46,18 @@ public class Optimizer {
    */
   public void initialize(HiveConf hiveConf) {
     transformations = new ArrayList<Transform>();
+    // Add correlation optimizer for first phase query plan tree analysis.
+    // The first phase will record original opColumnExprMap, opParseCtx, opRowResolver,
+    // since these may be changed by other optimizers (e.g. entries in opColumnExprMap may be deleted).
+    // If hive.groupby.skewindata is on, CorrelationOptimizer will not be applied.
+    // TODO: Make correlation optimizer 1 phase.
+    CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer();
+    if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+      // TODO: make CorrelationOptimizer compatible with SkewJoinOptimizer
+      transformations.add(correlationOptimizer);
+    }
     // Add the transformation that computes the lineage information.
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
@@ -83,6 +95,13 @@ public class Optimizer {
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
       transformations.add(new GlobalLimitOptimizer());
     }
+    // The second phase of correlation optimizer used for correlation detection and query plan tree transformation.
+    // The second phase should be the last optimizer added into transformations.
+    if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+      transformations.add(correlationOptimizer);
+    }
     transformations.add(new SimpleFetchOptimizer());  // must be called last
   }