You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/18 11:16:53 UTC
svn commit: r1504395 [2/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/if/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * Implementation of Correlation Optimizer. This optimizer is based on
+ * the paper "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
+ * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf).
+ * Correlation Optimizer detects if ReduceSinkOperators share same keys.
+ * Then, it will transform the query plan tree (operator tree) by exploiting
+ * detected correlations. For details, see the original paper of YSmart.
+ *
+ * Test queries associated with this optimizer are correlationoptimizer1.q to
+ * correlationoptimizer14.q
+ */
+public class CorrelationOptimizer implements Transform {
+
+ private static final Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+
+ private boolean abort; // if correlation optimizer will not try to optimize this query
+
+ private ParseContext pCtx;
+
+ //Join operators which may be converted by CommonJoinResolver;
+ private final Set<Operator<? extends OperatorDesc>> skipedJoinOperators;
+
+ public CorrelationOptimizer() {
+ super();
+ pCtx = null;
+ skipedJoinOperators = new HashSet<Operator<? extends OperatorDesc>>();
+ abort = false;
+ }
+
+ private void findPossibleAutoConvertedJoinOperators() throws SemanticException {
+ // Guess if CommonJoinResolver will work. If CommonJoinResolver may
+ // convert a join operation, correlation optimizer will not merge that join.
+ // TODO: If hive.auto.convert.join.noconditionaltask=true, for a JoinOperator
+ // that has both intermediate tables and query input tables as input tables,
+ // we should be able to guess if this JoinOperator will be converted to a MapJoin
+ // based on hive.auto.convert.join.noconditionaltask.size.
+ for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) {
+ boolean isAbleToGuess = true;
+ boolean mayConvert = false;
+ // Get total size and individual alias's size
+ long aliasTotalKnownInputSize = 0;
+ Map<String, Long> aliasToSize = new HashMap<String, Long>();
+ Map<Integer, String> posToAlias = new HashMap<Integer, String>();
+ for (Operator<? extends OperatorDesc> op: joinOp.getParentOperators()) {
+ TableScanOperator tsop = CorrelationUtilities.findTableScanOperator(op);
+ if (tsop == null) {
+ isAbleToGuess = false;
+ break;
+ }
+
+ Table table = pCtx.getTopToTable().get(tsop);
+ String alias = tsop.getConf().getAlias();
+ posToAlias.put(joinOp.getParentOperators().indexOf(op), alias);
+ if (table == null) {
+ // table should not be null.
+ throw new SemanticException("The table of " +
+ tsop.getName() + " " + tsop.getIdentifier() +
+ " is null, which is not expected.");
+ }
+
+ Path p = table.getPath();
+ FileSystem fs = null;
+ ContentSummary resultCs = null;
+ try {
+ fs = table.getPath().getFileSystem(pCtx.getConf());
+ resultCs = fs.getContentSummary(p);
+ } catch (IOException e) {
+ LOG.warn("Encounter a error while querying content summary of table " +
+ table.getCompleteName() + " from FileSystem. " +
+ "Cannot guess if CommonJoinOperator will optimize " +
+ joinOp.getName() + " " + joinOp.getIdentifier());
+ }
+ if (resultCs == null) {
+ isAbleToGuess = false;
+ break;
+ }
+
+ long size = resultCs.getLength();
+ aliasTotalKnownInputSize += size;
+ Long es = aliasToSize.get(alias);
+ if(es == null) {
+ es = new Long(0);
+ }
+ es += size;
+ aliasToSize.put(alias, es);
+ }
+
+ if (!isAbleToGuess) {
+ LOG.info("Cannot guess if CommonJoinOperator will optimize " +
+ joinOp.getName() + " " + joinOp.getIdentifier());
+ continue;
+ }
+
+ JoinDesc joinDesc = joinOp.getConf();
+ Byte[] order = joinDesc.getTagOrder();
+ int numAliases = order.length;
+ HashSet<Integer> bigTableCandidates =
+ MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
+ if (bigTableCandidates == null) {
+ continue;
+ }
+
+ String bigTableAlias = null;
+ long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(pCtx.getConf(),
+ HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
+ for (int i = 0; i < numAliases; i++) {
+ // this table cannot be big table
+ if (!bigTableCandidates.contains(i)) {
+ continue;
+ }
+ bigTableAlias = posToAlias.get(i);
+ if (!CommonJoinTaskDispatcher.cannotConvert(bigTableAlias, aliasToSize,
+ aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+ mayConvert = true;
+ }
+ }
+
+ if (mayConvert) {
+ LOG.info(joinOp.getName() + " " + joinOp.getIdentifier() +
+ " may be converted to MapJoin by CommonJoinResolver");
+ skipedJoinOperators.add(joinOp);
+ }
+ }
+ }
+
+ /**
+ * Detect correlations and transform the query tree.
+ *
+ * @param pactx
+ * current parse context
+ * @throws SemanticException
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ pCtx = pctx;
+
+ if (HiveConf.getBoolVar(pCtx.getConf(),HiveConf.ConfVars.HIVECONVERTJOIN)) {
+ findPossibleAutoConvertedJoinOperators();
+ }
+
+ // detect correlations
+ CorrelationNodeProcCtx corrCtx = new CorrelationNodeProcCtx(pCtx);
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+ new CorrelationNodeProc());
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, corrCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ // We have finished tree walking (correlation detection).
+ // We will first see if we need to abort (the operator tree has not been changed).
+ // If not, we will start to transform the operator tree.
+ abort = corrCtx.isAbort();
+ if (abort) {
+ LOG.info("Abort. Reasons are ...");
+ for (String reason : corrCtx.getAbortReasons()) {
+ LOG.info("-- " + reason);
+ }
+ } else {
+ // transform the operator tree
+ LOG.info("Begain query plan transformation based on intra-query correlations. " +
+ corrCtx.getCorrelations().size() + " correlation(s) to be applied");
+ for (IntraQueryCorrelation correlation : corrCtx.getCorrelations()) {
+ QueryPlanTreeTransformation.applyCorrelation(pCtx, corrCtx, correlation);
+ }
+ }
+ return pCtx;
+ }
+
+ private class CorrelationNodeProc implements NodeProcessor {
+
+ private void analyzeReduceSinkOperatorsOfJoinOperator(JoinCondDesc[] joinConds,
+ List<Operator<? extends OperatorDesc>> rsOps, Operator<? extends OperatorDesc> curentRsOp,
+ Set<ReduceSinkOperator> correlatedRsOps) {
+ if (correlatedRsOps.contains((ReduceSinkOperator) curentRsOp)) {
+ return;
+ }
+ correlatedRsOps.add((ReduceSinkOperator) curentRsOp);
+
+ int pos = rsOps.indexOf(curentRsOp);
+ for (int i = 0; i < joinConds.length; i++) {
+ JoinCondDesc joinCond = joinConds[i];
+ int type = joinCond.getType();
+ if (pos == joinCond.getLeft()) {
+ if (type == JoinDesc.INNER_JOIN ||
+ type == JoinDesc.LEFT_OUTER_JOIN ||
+ type == JoinDesc.LEFT_SEMI_JOIN) {
+ Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getRight());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ } else if (pos == joinCond.getRight()) {
+ if (type == JoinDesc.INNER_JOIN || type == JoinDesc.RIGHT_OUTER_JOIN) {
+ Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getLeft());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ }
+ }
+ }
+
+ private boolean sameKeys(List<ExprNodeDesc> k1, List<ExprNodeDesc> k2) {
+ if (k1.size() != k2.size()) {
+ return false;
+ }
+ for (int i = 0; i < k1.size(); i++) {
+ ExprNodeDesc expr1 = k1.get(i);
+ ExprNodeDesc expr2 = k2.get(i);
+ if (expr1 == null) {
+ if (expr2 == null) {
+ continue;
+ } else {
+ return false;
+ }
+ } else {
+ if (!expr1.isSame(expr2)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean sameOrder(String order1, String order2) {
+ if (order1 == null || order1.trim().equals("")) {
+ if (order2 == null || order2.trim().equals("")) {
+ return true;
+ }
+ return false;
+ }
+ if (order2 == null || order2.trim().equals("")) {
+ return false;
+ }
+ order1 = order1.trim();
+ order2 = order2.trim();
+ if (!order1.equals(order2)) {
+ return false;
+ }
+ return true;
+ }
+ /**
+ * This method is used to recursively traverse the tree to find
+ * ReduceSinkOperators which share the same key columns and partitioning
+ * columns. Those ReduceSinkOperators are called correlated ReduceSinkOperaotrs.
+ *
+ * @param child The child of the current operator
+ * @param childKeyCols The key columns from the child operator
+ * @param childPartitionCols The partitioning columns from the child operator
+ * @param childRSOrder The sorting order of key columns from the child operator
+ * @param current The current operator we are visiting
+ * @param correlation The object keeps tracking the correlation
+ * @return
+ * @throws SemanticException
+ */
+ private LinkedHashSet<ReduceSinkOperator> findCorrelatedReduceSinkOperators(
+ Operator<? extends OperatorDesc> child,
+ List<ExprNodeDesc> childKeyCols, List<ExprNodeDesc> childPartitionCols,
+ String childRSOrder,
+ Operator<? extends OperatorDesc> current,
+ IntraQueryCorrelation correlation) throws SemanticException {
+
+ LOG.info("now detecting operator " + current.getIdentifier() + " " + current.getName());
+
+ LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
+ new LinkedHashSet<ReduceSinkOperator>();
+ if (skipedJoinOperators.contains(current)) {
+ LOG.info(current.getName() + " " + current.getIdentifier() +
+ " may be converted to MapJoin by " +
+ "CommonJoinResolver. Correlation optimizer will not detect correlations" +
+ "involved in this operator");
+ return correlatedReduceSinkOperators;
+ }
+ if (current.getParentOperators() == null) {
+ return correlatedReduceSinkOperators;
+ }
+ if (current instanceof PTFOperator) {
+ // Currently, we do not support PTF operator.
+ LOG.info("Currently, correlation optimizer does not support PTF operator.");
+ return correlatedReduceSinkOperators;
+ }
+ if (current instanceof UnionOperator) {
+ // If we get a UnionOperator, right now, we only handle it when
+ // we can find correlated ReduceSinkOperators from all inputs.
+ LinkedHashSet<ReduceSinkOperator> corrRSs = new LinkedHashSet<ReduceSinkOperator>();
+ for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+ LinkedHashSet<ReduceSinkOperator> tmp =
+ findCorrelatedReduceSinkOperators(
+ current, childKeyCols, childPartitionCols, childRSOrder, parent, correlation);
+ if (tmp != null && tmp.size() > 0) {
+ corrRSs.addAll(tmp);
+ } else {
+ return correlatedReduceSinkOperators;
+ }
+ }
+ correlatedReduceSinkOperators.addAll(corrRSs);
+ UnionOperator union = (UnionOperator)current;
+ union.getConf().setAllInputsInSameReducer(true);
+ } else if (current.getColumnExprMap() == null && !(current instanceof ReduceSinkOperator)) {
+ for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+ correlatedReduceSinkOperators.addAll(
+ findCorrelatedReduceSinkOperators(
+ current, childKeyCols, childPartitionCols, childRSOrder, parent, correlation));
+ }
+ } else if (current.getColumnExprMap() != null && !(current instanceof ReduceSinkOperator)) {
+ List<ExprNodeDesc> backtrackedKeyCols =
+ ExprNodeDescUtils.backtrack(childKeyCols, child, current);
+ List<ExprNodeDesc> backtrackedPartitionCols =
+ ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+ Set<String> tableNeedToCheck = new HashSet<String>();
+ for (ExprNodeDesc expr: childKeyCols) {
+ if (!(expr instanceof ExprNodeColumnDesc)) {
+ return correlatedReduceSinkOperators;
+ } else {
+ String colName = ((ExprNodeColumnDesc)expr).getColumn();
+ OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
+ for (ColumnInfo cinfo : opCtx.getRowResolver().getColumnInfos()) {
+ if (colName.equals(cinfo.getInternalName())) {
+ tableNeedToCheck.add(cinfo.getTabAlias());
+ }
+ }
+ }
+ }
+ if (current instanceof JoinOperator) {
+ LinkedHashSet<ReduceSinkOperator> correlatedRsOps =
+ new LinkedHashSet<ReduceSinkOperator>();
+ for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+ Set<String> tableNames =
+ pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames();
+ for (String tbl : tableNames) {
+ if (tableNeedToCheck.contains(tbl)) {
+ correlatedRsOps.addAll(findCorrelatedReduceSinkOperators(
+ current, backtrackedKeyCols, backtrackedPartitionCols, childRSOrder,
+ parent, correlation));
+ }
+ }
+ }
+ // If current is JoinOperaotr, we will stop to traverse the tree
+ // when any of parent ReduceSinkOperaotr of this JoinOperator is
+ // not considered as a correlated ReduceSinkOperator.
+ if (correlatedRsOps.size() == current.getParentOperators().size()) {
+ correlatedReduceSinkOperators.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOperators.clear();
+ }
+ } else {
+ for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
+ correlatedReduceSinkOperators.addAll(findCorrelatedReduceSinkOperators(
+ current, backtrackedKeyCols, backtrackedPartitionCols, childRSOrder,
+ parent, correlation));
+ }
+ }
+ } else if (current.getColumnExprMap() != null && current instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rsop = (ReduceSinkOperator) current;
+ List<ExprNodeDesc> backtrackedKeyCols =
+ ExprNodeDescUtils.backtrack(childKeyCols, child, current);
+ List<ExprNodeDesc> backtrackedPartitionCols =
+ ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+ List<ExprNodeDesc> rsKeyCols = rsop.getConf().getKeyCols();
+ List<ExprNodeDesc> rsPartitionCols = rsop.getConf().getPartitionCols();
+
+ // Two ReduceSinkOperators are correlated means that
+ // they have same sorting columns (key columns), same partitioning columns,
+ // same sorting orders, and no conflict on the numbers of reducers.
+ // TODO: we should relax this condition
+ // TODO: we need to handle aggregation functions with distinct keyword. In this case,
+ // distinct columns will be added to the key columns.
+ boolean isCorrelated = sameKeys(rsKeyCols, backtrackedKeyCols) &&
+ sameOrder(rsop.getConf().getOrder(), childRSOrder) &&
+ sameKeys(backtrackedPartitionCols, rsPartitionCols) &&
+ correlation.adjustNumReducers(rsop.getConf().getNumReducers());
+ GroupByOperator cGBY =
+ CorrelationUtilities.getSingleChild(rsop, GroupByOperator.class);
+ if (cGBY != null) {
+ if (CorrelationUtilities.hasGroupingSet(rsop) ||
+ cGBY.getConf().isGroupingSetsPresent()) {
+ // Do not support grouping set right now
+ isCorrelated = false;
+ }
+ }
+
+ if (isCorrelated) {
+ LOG.info("Operator " + current.getIdentifier() + " " +
+ current.getName() + " is correlated");
+ Operator<? extends OperatorDesc> childOperator =
+ CorrelationUtilities.getSingleChild(current, true);
+ if (childOperator instanceof JoinOperator) {
+ JoinOperator joinOp = (JoinOperator) childOperator;
+ JoinCondDesc[] joinConds = joinOp.getConf().getConds();
+ List<Operator<? extends OperatorDesc>> rsOps = joinOp.getParentOperators();
+ LinkedHashSet<ReduceSinkOperator> correlatedRsOps =
+ new LinkedHashSet<ReduceSinkOperator>();
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, current, correlatedRsOps);
+ correlatedReduceSinkOperators.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOperators.add(rsop);
+ }
+ } else {
+ LOG.info("Operator " + current.getIdentifier() + " " +
+ current.getName() + " is not correlated");
+ correlatedReduceSinkOperators.clear();
+ }
+ } else {
+ LOG.error("ReduceSinkOperator " + current.getIdentifier() + " does not have ColumnExprMap");
+ throw new SemanticException("CorrelationOptimizer cannot optimize this plan. " +
+ "ReduceSinkOperator " + current.getIdentifier()
+ + " does not have ColumnExprMap");
+ }
+ return correlatedReduceSinkOperators;
+ }
+
+ /** Start to exploit Job Flow Correlation from op.
+ * Example: here is the operator tree we have ...
+ * JOIN2
+ * / \
+ * RS4 RS5
+ * / \
+ * GBY1 JOIN1
+ * | / \
+ * RS1 RS2 RS3
+ * The op will be RS4. If we can execute GBY1, JOIN1, and JOIN2 in
+ * the same reducer. This method will return [RS1, RS2, RS3].
+ * @param op
+ * @param correlationCtx
+ * @param correlation
+ * @return
+ * @throws SemanticException
+ */
+ private LinkedHashSet<ReduceSinkOperator> exploitJobFlowCorrelation(ReduceSinkOperator op,
+ CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation)
+ throws SemanticException {
+ correlationCtx.addWalked(op);
+ correlation.addToAllReduceSinkOperators(op);
+ boolean shouldDetect = true;
+ LinkedHashSet<ReduceSinkOperator> reduceSinkOperators =
+ new LinkedHashSet<ReduceSinkOperator>();
+ List<ExprNodeDesc> keyCols = op.getConf().getKeyCols();
+ List<ExprNodeDesc> partitionCols = op.getConf().getPartitionCols();
+ for (ExprNodeDesc key : keyCols) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ shouldDetect = false;
+ }
+ }
+ for (ExprNodeDesc key : partitionCols) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ shouldDetect = false;
+ }
+ }
+ GroupByOperator cGBY =
+ CorrelationUtilities.getSingleChild(op, GroupByOperator.class);
+ if (cGBY != null) {
+ if (CorrelationUtilities.hasGroupingSet(op) ||
+ cGBY.getConf().isGroupingSetsPresent()) {
+ // Do not support grouping set right now
+ shouldDetect = false;
+ }
+ }
+
+ if (shouldDetect) {
+ LinkedHashSet<ReduceSinkOperator> newReduceSinkOperators =
+ new LinkedHashSet<ReduceSinkOperator>();
+ String sortOrder = op.getConf().getOrder();
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ LOG.info("Operator " + op.getIdentifier()
+ + ": start detecting correlation from this operator");
+ LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
+ findCorrelatedReduceSinkOperators(op, keyCols, partitionCols,
+ sortOrder, parent, correlation);
+ if (correlatedReduceSinkOperators.size() == 0) {
+ newReduceSinkOperators.add(op);
+ } else {
+ for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+ LinkedHashSet<ReduceSinkOperator> exploited =
+ exploitJobFlowCorrelation(rsop, correlationCtx, correlation);
+ if (exploited.size() == 0) {
+ newReduceSinkOperators.add(rsop);
+ } else {
+ newReduceSinkOperators.addAll(exploited);
+ }
+ }
+ }
+ }
+ reduceSinkOperators.addAll(newReduceSinkOperators);
+ }
+ return reduceSinkOperators;
+ }
+
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ CorrelationNodeProcCtx corrCtx = (CorrelationNodeProcCtx) ctx;
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+ // Check if we have visited this operator
+ if (corrCtx.isWalked(op)) {
+ return null;
+ }
+
+ LOG.info("Walk to operator " + op.getIdentifier() + " " + op.getName());
+
+ Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(op, true);
+ if (!(child instanceof JoinOperator) && !(child instanceof GroupByOperator)) {
+ corrCtx.addWalked(op);
+ return null;
+ }
+
+ // detect correlations
+ IntraQueryCorrelation correlation = new IntraQueryCorrelation(corrCtx.minReducer());
+ List<ReduceSinkOperator> topReduceSinkOperators =
+ CorrelationUtilities.findSiblingReduceSinkOperators(op);
+ List<ReduceSinkOperator> bottomReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+ // Adjust the number of reducers of this correlation based on
+ // those top layer ReduceSinkOperators.
+ for (ReduceSinkOperator rsop : topReduceSinkOperators) {
+ if (!correlation.adjustNumReducers(rsop.getConf().getNumReducers())) {
+ // If we have a conflict on the number of reducers, we will not optimize
+ // this plan from here.
+ corrCtx.addWalked(op);
+ return null;
+ }
+ }
+ for (ReduceSinkOperator rsop : topReduceSinkOperators) {
+ LinkedHashSet<ReduceSinkOperator> thisBottomReduceSinkOperators =
+ exploitJobFlowCorrelation(rsop, corrCtx, correlation);
+ if (thisBottomReduceSinkOperators.size() == 0) {
+ thisBottomReduceSinkOperators.add(rsop);
+ }
+ bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+ }
+
+ if (!topReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+ LOG.info("has job flow correlation");
+ correlation.setJobFlowCorrelation(true, bottomReduceSinkOperators);
+ }
+
+ if (correlation.hasJobFlowCorrelation()) {
+ corrCtx.addCorrelation(correlation);
+ } else {
+ // Since we cannot merge operators into a single MR job from here,
+ // we should remove ReduceSinkOperators added into walked in exploitJFC
+ corrCtx.removeWalkedAll(correlation.getAllReduceSinkOperators());
+ }
+
+ corrCtx.addWalked(op);
+ return null;
+ }
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ LOG.info("Walk to operator " + op.getIdentifier() + " "
+ + op.getName() + ". No actual work to do");
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+ if (op.getName().equals(MapJoinOperator.getOperatorName())) {
+ correlationCtx.setAbort(true);
+ correlationCtx.getAbortReasons().add("Found MAPJOIN");
+ }
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ correlationCtx.incrementFileSinkOperatorCount();
+ }
+ return null;
+ }
+ };
+ }
+
+ protected class CorrelationNodeProcCtx extends AbstractCorrelationProcCtx {
+
+ private boolean abort;
+ private final List<String> abortReasons;
+
+ private final Set<ReduceSinkOperator> walked;
+
+ private final List<IntraQueryCorrelation> correlations;
+
+ private int fileSinkOperatorCount;
+
+ public CorrelationNodeProcCtx(ParseContext pctx) {
+ super(pctx);
+ walked = new HashSet<ReduceSinkOperator>();
+ correlations = new ArrayList<IntraQueryCorrelation>();
+ abort = false;
+ abortReasons = new ArrayList<String>();
+ fileSinkOperatorCount = 0;
+ }
+
+ public void setAbort(boolean abort) {
+ this.abort = abort;
+ }
+
+ public boolean isAbort() {
+ return abort;
+ }
+
+ public List<String> getAbortReasons() {
+ return abortReasons;
+ }
+
+ public void addCorrelation(IntraQueryCorrelation correlation) {
+ correlations.add(correlation);
+ }
+
+ public List<IntraQueryCorrelation> getCorrelations() {
+ return correlations;
+ }
+
+ public boolean isWalked(ReduceSinkOperator op) {
+ return walked.contains(op);
+ }
+
+ public void addWalked(ReduceSinkOperator op) {
+ walked.add(op);
+ }
+
+ public void addWalkedAll(Collection<ReduceSinkOperator> c) {
+ walked.addAll(c);
+ }
+
+ public void removeWalked(ReduceSinkOperator op) {
+ walked.remove(op);
+ }
+
+ public void removeWalkedAll(Collection<ReduceSinkOperator> c) {
+ walked.removeAll(c);
+ }
+
+ public void incrementFileSinkOperatorCount() {
+ fileSinkOperatorCount++;
+ if (fileSinkOperatorCount == 2) {
+ abort = true;
+ abortReasons.add(
+ "-- Currently, a query with multiple FileSinkOperators are not supported.");
+ }
+ }
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+/**
+ * Utilities for both CorrelationOptimizer and ReduceSinkDeDuplication.
+ *
+ */
+public final class CorrelationUtilities {
+
+ protected static boolean isExisted(ExprNodeDesc expr, List<ExprNodeDesc> columns) {
+ for (ExprNodeDesc thisExpr : columns) {
+ if (thisExpr != null && thisExpr.isSame(expr)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected static String getColumnName(
+ Map<String, ExprNodeDesc> opColumnExprMap, ExprNodeDesc expr) {
+ for (Entry<String, ExprNodeDesc> entry : opColumnExprMap.entrySet()) {
+ ExprNodeDesc thisExpr = entry.getValue();
+ if (thisExpr != null && thisExpr.isSame(expr)) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ protected static boolean hasGroupingSet(ReduceSinkOperator cRS) throws SemanticException {
+ GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class);
+ if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param operator the input operator
+ * @param throwException if throw a exception when the input operator has multiple parents
+ * @return the single parent or null when the input operator has multiple parents and
+ * throwException is false;
+ * @throws HiveException
+ */
+ protected static Operator<?> getSingleParent(Operator<?> operator,
+ boolean throwException) throws SemanticException {
+ List<Operator<?>> parents = operator.getParentOperators();
+ if (parents != null && parents.size() == 1) {
+ return parents.get(0);
+ }
+ if (throwException) {
+ if (parents == null) {
+ throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+ operator.getIdentifier() + ") does not have any parent, but we expect 1 parent.");
+ } else if (parents.size() > 1) {
+ throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+ operator.getIdentifier() + ") has " + parents.size() +
+ " parents, but we expect 1 parent.");
+ }
+ }
+ return null;
+ }
+
+ protected static Operator<?> getSingleParent(Operator<?> operator) throws SemanticException {
+ return getSingleParent(operator, false);
+ }
+
+ /**
+ * @param operator the input operator
+ * @param throwException if throw a exception when the input operator has multiple children
+ * @return the single child or null when the input operator has multiple children and
+ * throwException is false;
+ * @throws HiveException
+ */
+ protected static Operator<?> getSingleChild(Operator<?> operator,
+ boolean throwException) throws SemanticException {
+ List<Operator<?>> children = operator.getChildOperators();
+ if (children != null && children.size() == 1) {
+ return children.get(0);
+ }
+ if (throwException) {
+ if (children == null) {
+ throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+ operator.getIdentifier() + ") does not have any parent, but we expect 1 parent.");
+ } else if (children.size() > 1) {
+ throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+ operator.getIdentifier() + ") has " + children.size() +
+ " parents, but we expect 1 parent.");
+ }
+ }
+ return null;
+ }
+
+ protected static Operator<?> getSingleChild(Operator<?> operator) throws SemanticException {
+ return getSingleChild(operator, false);
+ }
+
+ protected static <T> T getSingleChild(Operator<?> operator, Class<T> type)
+ throws SemanticException {
+ Operator<?> parent = getSingleChild(operator);
+ return type.isInstance(parent) ? (T)parent : null;
+ }
+
+ protected static <T> T getSingleParent(Operator<?> operator, Class<T> type)
+ throws SemanticException {
+ Operator<?> parent = getSingleParent(operator);
+ return type.isInstance(parent) ? (T)parent : null;
+ }
+
+ protected static Operator<?> getStartForGroupBy(ReduceSinkOperator cRS)
+ throws SemanticException {
+ Operator<? extends Serializable> parent = getSingleParent(cRS);
+ return parent instanceof GroupByOperator ? parent : cRS; // skip map-aggr GBY
+ }
+
+
+ protected static boolean[] getSortedTags(JoinOperator joinOp) {
+ boolean[] result = new boolean[joinOp.getParentOperators().size()];
+ for (int tag = 0; tag < result.length; tag++) {
+ result[tag] = isSortedTag(joinOp, tag);
+ }
+ return result;
+ }
+
+ // for left outer joins, left alias is sorted but right alias might be not
+ // (nulls, etc.). vice versa.
+ protected static boolean isSortedTag(JoinOperator joinOp, int tag) {
+ for (JoinCondDesc cond : joinOp.getConf().getConds()) {
+ switch (cond.getType()) {
+ case JoinDesc.LEFT_OUTER_JOIN:
+ if (cond.getRight() == tag) {
+ return false;
+ }
+ continue;
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ if (cond.getLeft() == tag) {
+ return false;
+ }
+ continue;
+ case JoinDesc.FULL_OUTER_JOIN:
+ if (cond.getLeft() == tag || cond.getRight() == tag) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected static int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child,
+ Operator[] parents, boolean[] sorted) throws SemanticException {
+ for (int tag = 0; tag < parents.length; tag++) {
+ if (sorted[tag] &&
+ pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) {
+ return tag;
+ }
+ }
+ return -1;
+ }
+
+ protected static <T extends Operator<?>> T findPossibleParent(Operator<?> start, Class<T> target,
+ boolean trustScript) throws SemanticException {
+ T[] parents = findPossibleParents(start, target, trustScript);
+ return parents != null && parents.length == 1 ? parents[0] : null;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <T extends Operator<?>> T[] findPossibleParents(
+ Operator<?> start, Class<T> target,
+ boolean trustScript) throws SemanticException {
+ Operator<?> cursor = getSingleParent(start);
+ for (; cursor != null; cursor = getSingleParent(cursor)) {
+ if (target.isAssignableFrom(cursor.getClass())) {
+ T[] array = (T[]) Array.newInstance(target, 1);
+ array[0] = (T) cursor;
+ return array;
+ }
+ if (cursor instanceof JoinOperator) {
+ return findParents((JoinOperator) cursor, target);
+ }
+ if (cursor instanceof ScriptOperator && !trustScript) {
+ return null;
+ }
+ if (!(cursor instanceof SelectOperator
+ || cursor instanceof FilterOperator
+ || cursor instanceof ExtractOperator
+ || cursor instanceof ForwardOperator
+ || cursor instanceof ScriptOperator
+ || cursor instanceof ReduceSinkOperator)) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <T extends Operator<?>> T[] findParents(JoinOperator join, Class<T> target)
+ throws SemanticException {
+ List<Operator<?>> parents = join.getParentOperators();
+ T[] result = (T[]) Array.newInstance(target, parents.size());
+ for (int tag = 0; tag < result.length; tag++) {
+ Operator<?> cursor = parents.get(tag);
+ for (; cursor != null; cursor = getSingleParent(cursor)) {
+ if (target.isAssignableFrom(cursor.getClass())) {
+ result[tag] = (T) cursor;
+ break;
+ }
+ }
+ if (result[tag] == null) {
+ throw new IllegalStateException("failed to find " + target.getSimpleName()
+ + " from " + join + " on tag " + tag);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Search the query plan tree from startPoint to the bottom. If there is no ReduceSinkOperator
+ * between startPoint and the corresponding TableScanOperator, return the corresponding
+ * TableScanOperator. Otherwise, return null.
+ * @param startPoint the operator which the search will start at
+ * @return the TableScanOperator traced from startPoint. Null, if the search encounters any
+ * ReduceSinkOperator.
+ */
+ protected static TableScanOperator findTableScanOperator(
+ Operator<? extends OperatorDesc> startPoint) {
+ Operator<? extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+ while (true) {
+ if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ return null;
+ } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+ return (TableScanOperator) thisOp;
+ } else {
+ if (thisOp.getParentOperators() != null) {
+ thisOp = thisOp.getParentOperators().get(0);
+ } else {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find all sibling ReduceSinkOperators (which have the same child operator of op) of op (op
+ * included).
+ * @throws SemanticException
+ */
+ public static List<ReduceSinkOperator> findSiblingReduceSinkOperators(ReduceSinkOperator op)
+ throws SemanticException {
+ List<ReduceSinkOperator> siblingRSs = new ArrayList<ReduceSinkOperator>();
+ Operator<? extends OperatorDesc> child = getSingleChild(op, true);
+ for (Operator<? extends OperatorDesc> parent: child.getParentOperators()) {
+ if (parent instanceof ReduceSinkOperator) {
+ siblingRSs.add((ReduceSinkOperator)parent);
+ } else {
+ throw new SemanticException("An sibling of a ReduceSinkOperatpr is not a" +
+ "ReduceSinkOperatpr.");
+ }
+ }
+ return siblingRSs;
+ }
+
+ /**
+ * Find all sibling operators (which have the same child operator of op) of op (op
+ * included).
+ * @throws SemanticException
+ */
+ public static List<Operator<? extends OperatorDesc>> findSiblingOperators(
+ Operator<? extends OperatorDesc> op)
+ throws SemanticException {
+ Operator<? extends OperatorDesc> child = getSingleChild(op, true);
+ return child.getParentOperators();
+ }
+
+ protected static SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS,
+ ParseContext context, AbstractCorrelationProcCtx procCtx) throws SemanticException {
+ SelectOperator select = replaceOperatorWithSelect(childRS, context, procCtx);
+ select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames());
+ select.getConf().setColList(childRS.getConf().getValueCols());
+ return select;
+ }
+
+ // replace the cRS to SEL operator
+ // If child if cRS is EXT, EXT also should be removed
+ protected static SelectOperator replaceOperatorWithSelect(Operator<?> operator,
+ ParseContext context, AbstractCorrelationProcCtx procCtx)
+ throws SemanticException {
+ RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+ SelectDesc select = new SelectDesc(null, null);
+
+ Operator<?> parent = getSingleParent(operator);
+ Operator<?> child = getSingleChild(operator);
+
+ parent.getChildOperators().clear();
+
+ SelectOperator sel = (SelectOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+ .getColumnInfos()), parent), inputRR, context);
+
+ sel.setColumnExprMap(operator.getColumnExprMap());
+
+ sel.setChildOperators(operator.getChildOperators());
+ for (Operator<? extends Serializable> ch : operator.getChildOperators()) {
+ ch.replaceParent(operator, sel);
+ }
+ if (child instanceof ExtractOperator) {
+ removeOperator(child, getSingleChild(child), sel, context);
+ procCtx.addRemovedOperator(child);
+ }
+ operator.setChildOperators(null);
+ operator.setParentOperators(null);
+ procCtx.addRemovedOperator(operator);
+ return sel;
+ }
+
+ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr,
+ ParseContext context, AbstractCorrelationProcCtx procCtx) throws SemanticException {
+
+ Operator<?> parent = getSingleParent(cRS);
+
+ if (parent instanceof GroupByOperator) {
+ // pRS-cGBYm-cRS-cGBYr (map aggregation) --> pRS-cGBYr(COMPLETE)
+ // copies desc of cGBYm to cGBYr and remove cGBYm and cRS
+ GroupByOperator cGBYm = (GroupByOperator) parent;
+
+ cGBYr.getConf().setKeys(cGBYm.getConf().getKeys());
+ cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators());
+ for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) {
+ aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE);
+ }
+ cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
+ cGBYr.setSchema(cGBYm.getSchema());
+ RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
+ context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
+ } else {
+ // pRS-cRS-cGBYr (no map aggregation) --> pRS-cGBYr(COMPLETE)
+ // revert expressions of cGBYr to that of cRS
+ cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS));
+ for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) {
+ aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS));
+ }
+
+ Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
+ RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+
+ Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
+ RowResolver newRR = new RowResolver();
+
+ List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
+ for (int i = 0; i < outputCols.size(); i++) {
+ String colName = outputCols.get(i);
+ String[] nm = oldRR.reverseLookup(colName);
+ ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
+ newRR.put(nm[0], nm[1], colInfo);
+ ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
+ if (colExpr != null) {
+ newMap.put(colInfo.getInternalName(), colExpr);
+ }
+ }
+ cGBYr.setColumnExprMap(newMap);
+ cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
+ context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
+ }
+ cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
+
+ removeOperator(cRS, cGBYr, parent, context);
+ procCtx.addRemovedOperator(cRS);
+
+ if (parent instanceof GroupByOperator) {
+ removeOperator(parent, cGBYr, getSingleParent(parent), context);
+ procCtx.addRemovedOperator(cGBYr);
+ }
+ }
+
+ /** throw a exception if the input operator is null
+ * @param operator
+ * @throws HiveException
+ */
+ protected static void isNullOperator(Operator<?> operator) throws SemanticException {
+ if (operator == null) {
+ throw new SemanticException("Operator " + operator.getName() + " (ID: " +
+ operator.getIdentifier() + ") is null.");
+ }
+ }
+
+ /**
+ * @param newOperator the operator will be inserted between child and parent
+ * @param child
+ * @param parent
+ * @param context
+ * @throws HiveException
+ */
+ protected static void insertOperatorBetween(
+ Operator<?> newOperator, Operator<?> parent, Operator<?> child)
+ throws SemanticException {
+ isNullOperator(newOperator);
+ isNullOperator(parent);
+ isNullOperator(child);
+
+ if (parent != getSingleParent(child)) {
+ throw new SemanticException("Operator " + parent.getName() + " (ID: " +
+ parent.getIdentifier() + ") is not the only parent of Operator " +
+ child.getName() + " (ID: " + child.getIdentifier() + ")");
+ }
+ if (child != getSingleChild(parent)) {
+ throw new SemanticException("Operator " + child.getName() + " (ID: " +
+ child.getIdentifier() + ") is not the only child of Operator " +
+ parent.getName() + " (ID: " + parent.getIdentifier() + ")");
+ }
+
+ newOperator.setParentOperators(Utilities.makeList(parent));
+ newOperator.setChildOperators(Utilities.makeList(child));
+
+ child.setParentOperators(Utilities.makeList(newOperator));
+ parent.setChildOperators(Utilities.makeList(newOperator));
+ }
+
+ protected static void removeOperator(Operator<?> target, Operator<?> child, Operator<?> parent,
+ ParseContext context) {
+ for (Operator<?> aparent : target.getParentOperators()) {
+ aparent.replaceChild(target, child);
+ }
+ for (Operator<?> achild : target.getChildOperators()) {
+ achild.replaceParent(target, parent);
+ }
+ target.setChildOperators(null);
+ target.setParentOperators(null);
+ context.getOpParseCtx().remove(target);
+ }
+
+ protected static Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+ ParseContext context) {
+ OpParseContext ctx = new OpParseContext(rr);
+ context.getOpParseCtx().put(op, ctx);
+ return op;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/IntraQueryCorrelation.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+/**
+ * IntraQueryCorrelation records a sub-tree of the query plan tree which can be
+ * evaluated in a single MR job. The boundary of this sub-tree is recorded by
+ * the ReduceSinkOperators the the bottom of this sub-tree.
+ * Also, allReduceSinkOperators in IntraQueryCorrelation contains all
+ * ReduceSinkOperators of this sub-tree.
+ */
+public class IntraQueryCorrelation {
+ private boolean jobFlowCorrelation;
+
+ // The bottom layer ReduceSinkOperators. These ReduceSinkOperators are used
+ // to record the boundary of this sub-tree which can be evaluated in a single MR
+ // job.
+ private List<ReduceSinkOperator> bottomReduceSinkOperators;
+
+ // The number of reducer(s) should be used for those bottom layer ReduceSinkOperators
+ private int numReducers;
+ // This is the min number of reducer(s) for the bottom layer ReduceSinkOperators to avoid query
+ // executed on too small number of reducers.
+ private final int minReducers;
+
+ // All ReduceSinkOperators in this sub-tree. This set is used when we start to remove unnecessary
+ // ReduceSinkOperators.
+ private final Set<ReduceSinkOperator> allReduceSinkOperators;
+
+ // Since we merge multiple operation paths, we assign new tags to bottom layer
+ // ReduceSinkOperatos. This mapping is used to map new tags to original tags associated
+ // to these bottom layer ReduceSinkOperators.
+ private final Map<Integer, Integer> newTagToOldTag;
+
+ // A map from new tags to indices of children of DemuxOperator (the first Operator at the
+ // Reduce side of optimized plan)
+ private final Map<Integer, Integer> newTagToChildIndex;
+
+ public IntraQueryCorrelation(int minReducers) {
+ this.jobFlowCorrelation = false;
+ this.numReducers = -1;
+ this.minReducers = minReducers;
+ this.allReduceSinkOperators = new HashSet<ReduceSinkOperator>();
+ this.newTagToOldTag = new HashMap<Integer, Integer>();
+ this.newTagToChildIndex = new HashMap<Integer, Integer>();
+ }
+
+ public Map<Integer, Integer> getNewTagToOldTag() {
+ return newTagToOldTag;
+ }
+
+ public Map<Integer, Integer> getNewTagToChildIndex() {
+ return newTagToChildIndex;
+ }
+
+ public void setNewTag(Integer newTag, Integer oldTag, Integer childIndex) {
+ newTagToOldTag.put(newTag, oldTag);
+ newTagToChildIndex.put(newTag, childIndex);
+ }
+ public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+ allReduceSinkOperators.add(rsop);
+ }
+
+ public Set<ReduceSinkOperator> getAllReduceSinkOperators() {
+ return allReduceSinkOperators;
+ }
+
+ public void setJobFlowCorrelation(boolean jobFlowCorrelation,
+ List<ReduceSinkOperator> bottomReduceSinkOperators) {
+ this.jobFlowCorrelation = jobFlowCorrelation;
+ this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+ }
+
+ public boolean hasJobFlowCorrelation() {
+ return jobFlowCorrelation;
+ }
+
+ public List<ReduceSinkOperator> getBottomReduceSinkOperators() {
+ return bottomReduceSinkOperators;
+ }
+
+ public int getNumReducers() {
+ return numReducers;
+ }
+
+ public boolean adjustNumReducers(int newNumReducers) {
+ assert newNumReducers != 0;
+ if (newNumReducers > 0) {
+ // If the new numReducer is less than minReducer, we will not consider
+ // ReduceSinkOperator with this newNumReducer as a correlated ReduceSinkOperator
+ if (newNumReducers < minReducers) {
+ return false;
+ }
+ if (numReducers > 0) {
+ if (newNumReducers != numReducers) {
+ // If (numReducers > 0 && newNumReducers > 0 && newNumReducers != numReducers),
+ // we will not consider ReduceSinkOperator with this newNumReducer as a correlated
+ // ReduceSinkOperator
+ return false;
+ }
+ } else {
+ // if numReducers < 0 and newNumReducers > 0
+ numReducers = newNumReducers;
+ }
+ }
+
+ return true;
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.optimizer.correlation.CorrelationOptimizer.CorrelationNodeProcCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DemuxDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+/**
+ * QueryPlanTreeTransformation contains static methods used to transform
+ * the query plan tree (operator tree) based on the correlation we have
+ * detected by Correlation Optimizer.
+ */
+public class QueryPlanTreeTransformation {
+ private static final Log LOG = LogFactory.getLog(QueryPlanTreeTransformation.class.getName());
+
+ private static void setNewTag(IntraQueryCorrelation correlation,
+ List<Operator<? extends OperatorDesc>> childrenOfDemux,
+ ReduceSinkOperator rsop, Map<ReduceSinkOperator, Integer> bottomRSToNewTag)
+ throws SemanticException {
+ int newTag = bottomRSToNewTag.get(rsop);
+ int oldTag = rsop.getConf().getTag();
+ // if this child of dispatcher does not use tag, we just set the oldTag to 0;
+ if (oldTag == -1) {
+ oldTag = 0;
+ }
+ Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
+ if (!childrenOfDemux.contains(child)) {
+ childrenOfDemux.add(child);
+ }
+ int childIndex = childrenOfDemux.indexOf(child);
+ correlation.setNewTag(newTag, oldTag, childIndex);
+ rsop.getConf().setTag(newTag);
+ }
+
+ /** Based on the correlation, we transform the query plan tree (operator tree).
+ * In here, we first create DemuxOperator and all bottom ReduceSinkOperators
+ * (bottom means near TableScanOperaotr) in the correlation will be be
+ * the parents of the DemuxOperaotr. We also reassign tags to those
+ * ReduceSinkOperators. Then, we use MuxOperators to replace ReduceSinkOperators
+ * which are not bottom ones in this correlation.
+ * Example: The original operator tree is ...
+ * JOIN2
+ * / \
+ * RS4 RS5
+ * / \
+ * GBY1 JOIN1
+ * | / \
+ * RS1 RS2 RS3
+ * If GBY1, JOIN1, and JOIN2 can be executed in the same reducer
+ * (optimized by Correlation Optimizer).
+ * The new operator tree will be ...
+ * JOIN2
+ * |
+ * MUX
+ * / \
+ * GBY1 JOIN1
+ * \ /
+ * DEMUX
+ * / | \
+ * / | \
+ * / | \
+ * RS1 RS2 RS3
+ * @param pCtx
+ * @param corrCtx
+ * @param correlation
+ * @throws SemanticException
+ */
+ protected static void applyCorrelation(
+ ParseContext pCtx,
+ CorrelationNodeProcCtx corrCtx,
+ IntraQueryCorrelation correlation)
+ throws SemanticException {
+
+ final List<ReduceSinkOperator> bottomReduceSinkOperators =
+ correlation.getBottomReduceSinkOperators();
+ final int numReducers = correlation.getNumReducers();
+ List<Operator<? extends OperatorDesc>> childrenOfDemux =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ List<Operator<? extends OperatorDesc>> parentRSsOfDemux =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ Map<Integer, Integer> childIndexToOriginalNumParents =
+ new HashMap<Integer, Integer>();
+ List<TableDesc> keysSerializeInfos = new ArrayList<TableDesc>();
+ List<TableDesc> valuessSerializeInfos = new ArrayList<TableDesc>();
+ Map<ReduceSinkOperator, Integer> bottomRSToNewTag =
+ new HashMap<ReduceSinkOperator, Integer>();
+ int newTag = 0;
+ for (ReduceSinkOperator rsop: bottomReduceSinkOperators) {
+ rsop.getConf().setNumReducers(numReducers);
+ bottomRSToNewTag.put(rsop, newTag);
+ parentRSsOfDemux.add(rsop);
+ keysSerializeInfos.add(rsop.getConf().getKeySerializeInfo());
+ valuessSerializeInfos.add(rsop.getConf().getValueSerializeInfo());
+ Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
+ if (!childrenOfDemux.contains(child)) {
+ childrenOfDemux.add(child);
+ int childIndex = childrenOfDemux.size() - 1;
+ childIndexToOriginalNumParents.put(childIndex, child.getNumParent());
+ }
+ newTag++;
+ }
+
+ for (ReduceSinkOperator rsop: bottomReduceSinkOperators) {
+ setNewTag(correlation, childrenOfDemux, rsop, bottomRSToNewTag);
+ }
+
+ // Create the DemuxOperaotr
+ DemuxDesc demuxDesc =
+ new DemuxDesc(
+ correlation.getNewTagToOldTag(),
+ correlation.getNewTagToChildIndex(),
+ childIndexToOriginalNumParents,
+ keysSerializeInfos,
+ valuessSerializeInfos);
+ Operator<? extends OperatorDesc> demuxOp = OperatorFactory.get(demuxDesc);
+ demuxOp.setChildOperators(childrenOfDemux);
+ demuxOp.setParentOperators(parentRSsOfDemux);
+ for (Operator<? extends OperatorDesc> child: childrenOfDemux) {
+ List<Operator<? extends OperatorDesc>> parentsWithMultipleDemux =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ boolean hasBottomReduceSinkOperators = false;
+ boolean hasNonBottomReduceSinkOperators = false;
+ for (int i = 0; i < child.getParentOperators().size(); i++) {
+ Operator<? extends OperatorDesc> p = child.getParentOperators().get(i);
+ assert p instanceof ReduceSinkOperator;
+ ReduceSinkOperator rsop = (ReduceSinkOperator)p;
+ if (bottomReduceSinkOperators.contains(rsop)) {
+ hasBottomReduceSinkOperators = true;
+ parentsWithMultipleDemux.add(demuxOp);
+ } else {
+ hasNonBottomReduceSinkOperators = true;
+ parentsWithMultipleDemux.add(rsop);
+ }
+ }
+ if (hasBottomReduceSinkOperators && hasNonBottomReduceSinkOperators) {
+ child.setParentOperators(parentsWithMultipleDemux);
+ } else {
+ child.setParentOperators(Utilities.makeList(demuxOp));
+ }
+ }
+ for (Operator<? extends OperatorDesc> parent: parentRSsOfDemux) {
+ parent.setChildOperators(Utilities.makeList(demuxOp));
+ }
+
+ // replace all ReduceSinkOperators which are not at the bottom of
+ // this correlation to MuxOperators
+ Set<ReduceSinkOperator> handledRSs = new HashSet<ReduceSinkOperator>();
+ for (ReduceSinkOperator rsop : correlation.getAllReduceSinkOperators()) {
+ if (!bottomReduceSinkOperators.contains(rsop)) {
+ if (handledRSs.contains(rsop)) {
+ continue;
+ }
+ Operator<? extends OperatorDesc> childOP =
+ CorrelationUtilities.getSingleChild(rsop, true);
+ if (childOP instanceof GroupByOperator) {
+ CorrelationUtilities.removeReduceSinkForGroupBy(
+ rsop, (GroupByOperator)childOP, pCtx, corrCtx);
+ List<Operator<? extends OperatorDesc>> parentsOfMux =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ Operator<? extends OperatorDesc> parentOp =
+ CorrelationUtilities.getSingleParent(childOP, true);
+ parentsOfMux.add(parentOp);
+ Operator<? extends OperatorDesc> mux = OperatorFactory.get(
+ new MuxDesc(parentsOfMux));
+ mux.setChildOperators(Utilities.makeList(childOP));
+ mux.setParentOperators(parentsOfMux);
+ childOP.setParentOperators(Utilities.makeList(mux));
+ parentOp.setChildOperators(Utilities.makeList(mux));
+ } else {
+ // childOp is a JoinOperator
+ List<Operator<? extends OperatorDesc>> parentsOfMux =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ List<Operator<? extends OperatorDesc>> siblingOPs =
+ CorrelationUtilities.findSiblingOperators(rsop);
+ for (Operator<? extends OperatorDesc> op: siblingOPs) {
+ if (op instanceof DemuxOperator) {
+ parentsOfMux.add(op);
+ } else if (op instanceof ReduceSinkOperator){
+ GroupByOperator pGBYm =
+ CorrelationUtilities.getSingleParent(op, GroupByOperator.class);
+ if (pGBYm != null) {
+ // We get a semi join at here.
+ // This map-side GroupByOperator needs to be removed
+ CorrelationUtilities.removeOperator(
+ pGBYm, op, CorrelationUtilities.getSingleParent(pGBYm, true), pCtx);
+ }
+ handledRSs.add((ReduceSinkOperator)op);
+ parentsOfMux.add(CorrelationUtilities.getSingleParent(op, true));
+ } else {
+ throw new SemanticException("An slibing of ReduceSinkOperator is nethier a " +
+ "DemuxOperator nor a ReduceSinkOperator");
+ }
+ }
+ MuxDesc muxDesc = new MuxDesc(siblingOPs);
+ Operator<? extends OperatorDesc> mux = OperatorFactory.get(muxDesc);
+ mux.setChildOperators(Utilities.makeList(childOP));
+ mux.setParentOperators(parentsOfMux);
+
+ for (Operator<? extends OperatorDesc> op: parentsOfMux) {
+ if (op instanceof DemuxOperator) {
+ // op is a DemuxOperator and it directly connects to childOP.
+ // We will add this MuxOperator between DemuxOperator
+ // and childOP.
+ if (op.getChildOperators().contains(childOP)) {
+ op.replaceChild(childOP, mux);
+ }
+ } else {
+ // op is not a DemuxOperator, so it should have
+ // a single child.
+ op.setChildOperators(Utilities.makeList(mux));
+ }
+ }
+ childOP.setParentOperators(Utilities.makeList(mux));
+ }
+ }
+ }
+ for (ReduceSinkOperator rsop: handledRSs) {
+ rsop.setChildOperators(null);
+ rsop.setParentOperators(null);
+ pCtx.getOpParseCtx().remove(rsop);
+ }
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+/**
+ * If two reducer sink operators share the same partition/sort columns and order,
+ * they can be merged. This should happen after map join optimization because map
+ * join optimization will remove reduce sink operators.
+ *
+ * This optimizer removes/replaces child-RS (not parent) which is safer way for DefaultGraphWalker.
+ */
+public class ReduceSinkDeDuplication implements Transform {
+
+ private static final String RS = ReduceSinkOperator.getOperatorName();
+ private static final String GBY = GroupByOperator.getOperatorName();
+ private static final String JOIN = JoinOperator.getOperatorName();
+
+ protected ParseContext pGraphContext;
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ pGraphContext = pctx;
+
+ // generate pruned column list for all relevant operators
+ ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
+
+ // for auto convert map-joins, it not safe to dedup in here (todo)
+ boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) &&
+ !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK);
+
+ // If multiple rules can be matched with same cost, last rule will be choosen as a processor
+ // see DefaultRuleDispatcher#dispatch()
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+ opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc());
+ if (mergeJoins) {
+ opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getJoinReducerProc());
+ }
+ // TODO RS+JOIN
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(ReduceSinkDeduplicateProcFactory
+ .getDefaultProc(), opRules, cppCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ return pGraphContext;
+ }
+
+ protected class ReduceSinkDeduplicateProcCtx extends AbstractCorrelationProcCtx {
+
+ public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
+ super(pctx);
+ }
+ }
+
+ static class ReduceSinkDeduplicateProcFactory {
+
+ public static NodeProcessor getReducerReducerProc() {
+ return new ReducerReducerProc();
+ }
+
+ public static NodeProcessor getGroupbyReducerProc() {
+ return new GroupbyReducerProc();
+ }
+
+ public static NodeProcessor getJoinReducerProc() {
+ return new JoinReducerProc();
+ }
+
+ public static NodeProcessor getDefaultProc() {
+ return new DefaultProc();
+ }
+ }
+
+ /*
+ * do nothing.
+ */
+ static class DefaultProc implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ }
+
+ public abstract static class AbsctractReducerReducerProc implements NodeProcessor {
+
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkDeduplicateProcCtx dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx;
+ if (dedupCtx.hasBeenRemoved((Operator<?>) nd)) {
+ return false;
+ }
+ ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+ Operator<?> child = CorrelationUtilities.getSingleChild(cRS);
+ if (child instanceof JoinOperator) {
+ return false; // not supported
+ }
+ if (child instanceof GroupByOperator) {
+ GroupByOperator cGBY = (GroupByOperator) child;
+ if (!CorrelationUtilities.hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) {
+ return process(cRS, cGBY, dedupCtx);
+ }
+ return false;
+ }
+ if (child instanceof ExtractOperator) {
+ return process(cRS, dedupCtx);
+ }
+ return false;
+ }
+
+ protected abstract Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException;
+
+ protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+ ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException;
+
+ // for JOIN-RS case, it's not possible generally to merge if child has
+ // more key/partition columns than parents
+ protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+ throws SemanticException {
+ List<Operator<?>> parents = pJoin.getParentOperators();
+ ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+ ReduceSinkDesc cRSc = cRS.getConf();
+ ReduceSinkDesc pRS0c = pRSs[0].getConf();
+ if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) {
+ return false;
+ }
+ if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) {
+ return false;
+ }
+ Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers());
+ if (moveReducerNumTo == null ||
+ moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+ return false;
+ }
+
+ Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder());
+ if (moveRSOrderTo == null) {
+ return false;
+ }
+
+ boolean[] sorted = CorrelationUtilities.getSortedTags(pJoin);
+
+ int cKeySize = cRSc.getKeyCols().size();
+ for (int i = 0; i < cKeySize; i++) {
+ ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+ ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+ for (int tag = 0; tag < pRSs.length; tag++) {
+ pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
+ }
+ int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+ if (found < 0) {
+ return false;
+ }
+ }
+ int cPartSize = cRSc.getPartitionCols().size();
+ for (int i = 0; i < cPartSize; i++) {
+ ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+ ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+ for (int tag = 0; tag < pRSs.length; tag++) {
+ pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+ }
+ int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+ if (found < 0) {
+ return false;
+ }
+ }
+
+ if (moveReducerNumTo > 0) {
+ for (ReduceSinkOperator pRS : pRSs) {
+ pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Current RSDedup remove/replace child RS. So always copies
+ * more specific part of configurations of child RS to that of parent RS.
+ */
+ protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+ throws SemanticException {
+ int[] result = checkStatus(cRS, pRS, minReducer);
+ if (result == null) {
+ return false;
+ }
+ if (result[0] > 0) {
+ ArrayList<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+ pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+ }
+ if (result[1] > 0) {
+ ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+ pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+ }
+ if (result[2] > 0) {
+ pRS.getConf().setOrder(cRS.getConf().getOrder());
+ }
+ if (result[3] > 0) {
+ pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+ }
+ return true;
+ }
+
+ /**
+ * Returns merge directions between two RSs for criterias (ordering, number of reducers,
+ * reducer keys, partition keys). Returns null if any of categories is not mergeable.
+ *
+ * Values for each index can be -1, 0, 1
+ * 1. 0 means two configuration in the category is the same
+ * 2. for -1, configuration of parent RS is more specific than child RS
+ * 3. for 1, configuration of child RS is more specific than parent RS
+ */
+ private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+ throws SemanticException {
+ ReduceSinkDesc cConf = cRS.getConf();
+ ReduceSinkDesc pConf = pRS.getConf();
+ Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder());
+ if (moveRSOrderTo == null) {
+ return null;
+ }
+ Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+ if (moveReducerNumTo == null ||
+ moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+ return null;
+ }
+ List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+ List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+ Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+ if (moveKeyColTo == null) {
+ return null;
+ }
+ List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+ List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+ Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+ if (movePartitionColTo == null) {
+ return null;
+ }
+ return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+ }
+
+ /**
+ * Overlapping part of keys should be the same between parent and child.
+ * And if child has more keys than parent, non-overlapping part of keys
+ * should be backtrackable to parent.
+ */
+ private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+ ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+ Integer moveKeyColTo = 0;
+ if (ckeys == null || ckeys.isEmpty()) {
+ if (pkeys != null && !pkeys.isEmpty()) {
+ moveKeyColTo = -1;
+ }
+ } else {
+ if (pkeys == null || pkeys.isEmpty()) {
+ for (ExprNodeDesc ckey : ckeys) {
+ if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+ // cKey is not present in parent
+ return null;
+ }
+ }
+ moveKeyColTo = 1;
+ } else {
+ moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+ }
+ }
+ return moveKeyColTo;
+ }
+
+ // backtrack key exprs of child to parent and compare it with parent's
+ protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+ Operator<?> child, Operator<?> parent) throws SemanticException {
+ int common = Math.min(cexprs.size(), pexprs.size());
+ int limit = Math.max(cexprs.size(), pexprs.size());
+ int i = 0;
+ for (; i < common; i++) {
+ ExprNodeDesc pexpr = pexprs.get(i);
+ ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+ if (cexpr == null || !pexpr.isSame(cexpr)) {
+ return null;
+ }
+ }
+ for (; i < limit; i++) {
+ if (cexprs.size() > pexprs.size()) {
+ if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+ // cKey is not present in parent
+ return null;
+ }
+ }
+ }
+ return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+ }
+
+ // order of overlapping keys should be exactly the same
+ protected Integer checkOrder(String corder, String porder) {
+ if (corder == null || corder.trim().equals("")) {
+ if (porder == null || porder.trim().equals("")) {
+ return 0;
+ }
+ return -1;
+ }
+ if (porder == null || porder.trim().equals("")) {
+ return 1;
+ }
+ corder = corder.trim();
+ porder = porder.trim();
+ int target = Math.min(corder.length(), porder.length());
+ if (!corder.substring(0, target).equals(porder.substring(0, target))) {
+ return null;
+ }
+ return Integer.valueOf(corder.length()).compareTo(porder.length());
+ }
+
+ /**
+ * If number of reducers for RS is -1, the RS can have any number of reducers.
+ * It's generally true except for order-by or forced bucketing cases.
+ * if both of num-reducers are not -1, those number should be the same.
+ */
+ protected Integer checkNumReducer(int creduce, int preduce) {
+ if (creduce < 0) {
+ if (preduce < 0) {
+ return 0;
+ }
+ return -1;
+ }
+ if (preduce < 0) {
+ return 1;
+ }
+ if (creduce != preduce) {
+ return null;
+ }
+ return 0;
+ }
+ }
+
+ static class GroupbyReducerProc extends AbsctractReducerReducerProc {
+
+ // pRS-pGBY-cRS
+ @Override
+ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ GroupByOperator pGBY =
+ CorrelationUtilities.findPossibleParent(
+ cRS, GroupByOperator.class, dedupCtx.trustScript());
+ if (pGBY == null) {
+ return false;
+ }
+ ReduceSinkOperator pRS =
+ CorrelationUtilities.findPossibleParent(
+ pGBY, ReduceSinkOperator.class, dedupCtx.trustScript());
+ if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+ CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+ cRS, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+
+ // pRS-pGBY-cRS-cGBY
+ @Override
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+ ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+ GroupByOperator pGBY =
+ CorrelationUtilities.findPossibleParent(
+ start, GroupByOperator.class, dedupCtx.trustScript());
+ if (pGBY == null) {
+ return false;
+ }
+ ReduceSinkOperator pRS =
+ CorrelationUtilities.getSingleParent(pGBY, ReduceSinkOperator.class);
+ if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+ CorrelationUtilities.removeReduceSinkForGroupBy(
+ cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ static class JoinReducerProc extends AbsctractReducerReducerProc {
+
+ // pRS-pJOIN-cRS
+ @Override
+ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ JoinOperator pJoin =
+ CorrelationUtilities.findPossibleParent(cRS, JoinOperator.class, dedupCtx.trustScript());
+ if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+ pJoin.getConf().setFixedAsSorted(true);
+ CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+ cRS, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+
+ // pRS-pJOIN-cRS-cGBY
+ @Override
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+ ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+ JoinOperator pJoin =
+ CorrelationUtilities.findPossibleParent(
+ start, JoinOperator.class, dedupCtx.trustScript());
+ if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+ pJoin.getConf().setFixedAsSorted(true);
+ CorrelationUtilities.removeReduceSinkForGroupBy(
+ cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ static class ReducerReducerProc extends AbsctractReducerReducerProc {
+
+ // pRS-cRS
+ @Override
+ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ ReduceSinkOperator pRS =
+ CorrelationUtilities.findPossibleParent(
+ cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
+ if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+ CorrelationUtilities.replaceReduceSinkWithSelectOperator(
+ cRS, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+
+ // pRS-cRS-cGBY
+ @Override
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+ ReduceSinkDeduplicateProcCtx dedupCtx)
+ throws SemanticException {
+ Operator<?> start = CorrelationUtilities.getStartForGroupBy(cRS);
+ ReduceSinkOperator pRS =
+ CorrelationUtilities.findPossibleParent(
+ start, ReduceSinkOperator.class, dedupCtx.trustScript());
+ if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+ CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
+ return true;
+ }
+ return false;
+ }
+ }
+}