You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/09/30 22:41:03 UTC
svn commit: r1392105 [2/7] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimiz...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,1037 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBExpr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * Implementation of correlation optimizer. The optimization is based on
+ * the paper "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
+ * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf).
+ * This optimizer first detects three kinds of
+ * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC),
+ * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job.
+ * Since opColumnExprMap, opParseCtx, opRowResolver may be changed by
+ * other optimizers,
+ * currently, correlation optimizer has two phases. The first phase is the first transformation in
+ * the Optimizer. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver
+ * will be recorded. Then, the second phase (the last transformation before SimpleFetchOptimizer)
+ * will perform correlation detection and query plan tree transformation.
+ *
+ * For the definitions of correlations, see the original paper of YSmart.
+ *
+ * Rules for merging correlated MR-jobs implemented in this correlation
+ * optimizer are:
+ * 1. If an MR-job for a Join operation has the same partitioning keys with its all
+ * preceding MR-jobs, correlation optimizer merges these MR-jobs into one MR-job.
+ * 2. If an MR-job for a GroupBy and Aggregation operation has the same partitioning keys
+ * with its preceding MR-job, correlation optimizer merges these two MR-jobs into one MR-job.
+ *
+ * Note: In the current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree
+ * are correlated, it transforms this sub-plan tree to a single MR-job when the input of this
+ * sub-plan tree is not a temporary table. Otherwise, the current implementation will ignore this
+ * sub-plan tree.
+ *
+ * There are several future work that will enhance the correlation optimizer.
+ * Here are four examples:
+ * 1. Add a new rule that is if two MR-jobs share the same
+ * partitioning keys and they have common input tables, merge these two MR-jobs into a single
+ * MR-job.
+ * 2. The current implementation detects MR-jobs which have the same partitioning keys
+ * as correlated MR-jobs. However, the condition of same partitioning keys can be relaxed to use
+ * common partitioning keys.
+ * 3. The current implementation cannot optimize MR-jobs for the
+ * aggregation functions with a distinct keyword, which should be supported in the future
+ * implementation of the correlation optimizer.
+ * 4. Optimize queries involve self-join.
+ */
+
+public class CorrelationOptimizer implements Transform {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+ private final Map<String, String> aliastoTabName;
+ private final Map<String, Table> aliastoTab;
+
+ public CorrelationOptimizer() {
+ super();
+ aliastoTabName = new HashMap<String, String>();
+ aliastoTab = new HashMap<String, Table>();
+ pGraphContext = null;
+ }
+
+ private boolean initializeAliastoTabNameMapping(QB qb) {
+ // If any sub-query's qb is null, CorrelationOptimizer will not optimize this query.
+ // e.g. auto_join27.q
+ if (qb == null) {
+ return false;
+ }
+ boolean ret = true;
+ for (String alias : qb.getAliases()) {
+ aliastoTabName.put(alias, qb.getTabNameForAlias(alias));
+ aliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias));
+ }
+ for (String subqalias : qb.getSubqAliases()) {
+ QBExpr qbexpr = qb.getSubqForAlias(subqalias);
+ ret = ret && initializeAliastoTabNameMapping(qbexpr.getQB());
+ }
+ return ret;
+ }
+
+ protected ParseContext pGraphContext;
+ private Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
+ private final Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx =
+ new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
+ private final Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver =
+ new LinkedHashMap<Operator<? extends OperatorDesc>, RowResolver>();
+ private final Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap =
+ new LinkedHashMap<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>>();
+
+ private boolean isPhase1 = true;
+ private boolean abort = false;
+
+ private Map<ReduceSinkOperator, GroupByOperator> groupbyNonMapSide2MapSide;
+ private Map<GroupByOperator, ReduceSinkOperator> groupbyMapSide2NonMapSide;
+
+ /**
+ * Transform the query tree. Firstly, find out correlations between operations.
+ * Then, group these operators in groups
+ *
+ * @param pactx
+ * current parse context
+ * @throws SemanticException
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ if (isPhase1) {
+ pGraphContext = pctx;
+ opParseCtx = pctx.getOpParseCtx();
+
+ CorrelationNodePhase1ProcCtx phase1ProcCtx = new CorrelationNodePhase1ProcCtx();
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules,
+ phase1ProcCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ isPhase1 = false;
+ abort = phase1ProcCtx.fileSinkOperatorCount > 1;
+ } else {
+ /*
+ * Types of correlations:
+ * 1) Input Correlation: Multiple nodes have input correlation
+ * (IC) if their input relation sets are not disjoint;
+ * 2) Transit Correlation: Multiple nodes have transit correlation
+ * (TC) if they have not only input correlation, but
+ * also the same partition key;
+ * 3) Job Flow Correlation: A node has job flow correlation
+ * (JFC) with one of its child nodes if it has the same
+ * partition key as that child node.
+ */
+
+ pGraphContext = pctx;
+ if (abort) {
+ //TODO: handle queries with multiple FileSinkOperators;
+ LOG.info("Abort. Reasons are ...");
+ LOG.info("-- Currently, a query with multiple FileSinkOperators are not supported.");
+ return pGraphContext;
+ }
+
+
+ opParseCtx = pctx.getOpParseCtx();
+
+ groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide();
+ groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide();
+
+ QB qb = pGraphContext.getQB();
+ abort = !initializeAliastoTabNameMapping(qb);
+ if (abort) {
+ LOG.info("Abort. Reasons are ...");
+ LOG.info("-- This query or its sub-queries has a null qb.");
+ return pGraphContext;
+ }
+
+ // 0: Replace all map-side group by pattern (GBY-RS-GBY) to
+ // non-map-side group by pattern (RS-GBY) if necessary
+ if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry<GroupByOperator, ReduceSinkOperator> entry:
+ groupbyMapSide2NonMapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getKey();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getValue();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List<Operator<? extends OperatorDesc>> parents = mapSidePatternStart.getParentOperators();
+ List<Operator<? extends OperatorDesc>> children = mapSidePatternEnd.getChildOperators();
+
+ nonMapSidePatternStart.setParentOperators(parents);
+ nonMapSidePatternEnd.setChildOperators(children);
+
+ for (Operator<? extends OperatorDesc> parent: parents) {
+ parent.replaceChild(mapSidePatternStart, nonMapSidePatternStart);
+ }
+ for (Operator<? extends OperatorDesc> child: children) {
+ child.replaceParent(mapSidePatternEnd, nonMapSidePatternEnd);
+ }
+ addOperatorInfo(nonMapSidePatternStart);
+ addOperatorInfo(nonMapSidePatternEnd);
+ }
+ }
+
+ // 1: detect correlations
+ CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx();
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+ new CorrelationNodeProc());
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ abort = correlationCtx.isAbort();
+ int correlationsAppliedCount = 0;
+ if (abort) {
+ LOG.info("Abort. Reasons are ...");
+ for (String reason: correlationCtx.getAbortReasons()) {
+ LOG.info("-- " + reason);
+ }
+ } else {
+ // 2: transform the query plan tree
+ LOG.info("Begain query plan transformation based on intra-query correlations. " +
+ correlationCtx.getCorrelations().size() + " correlation(s) to be applied");
+ for (IntraQueryCorrelation correlation : correlationCtx.getCorrelations()) {
+ boolean ret = CorrelationOptimizerUtils.applyCorrelation(
+ correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver,
+ groupbyNonMapSide2MapSide, originalOpParseCtx);
+ if (ret) {
+ correlationsAppliedCount++;
+ }
+ }
+ }
+
+ // 3: if no correlation applied, replace all non-map-side group by pattern (GBY-RS-GBY) to
+ // map-side group by pattern (RS-GBY) if necessary
+ if (correlationsAppliedCount == 0 &&
+ pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry<ReduceSinkOperator, GroupByOperator> entry:
+ groupbyNonMapSide2MapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getValue();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getKey();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List<Operator<? extends OperatorDesc>> parents = nonMapSidePatternStart.getParentOperators();
+ List<Operator<? extends OperatorDesc>> children = nonMapSidePatternEnd.getChildOperators();
+
+ mapSidePatternStart.setParentOperators(parents);
+ mapSidePatternEnd.setChildOperators(children);
+
+ for (Operator<? extends OperatorDesc> parent: parents) {
+ parent.replaceChild(nonMapSidePatternStart, mapSidePatternStart);
+ }
+ for (Operator<? extends OperatorDesc> child: children) {
+ child.replaceParent(nonMapSidePatternEnd, mapSidePatternEnd);
+ }
+ }
+ }
+ LOG.info("Finish query plan transformation based on intra-query correlations. " +
+ correlationsAppliedCount + " correlation(s) actually be applied");
+ }
+ return pGraphContext;
+ }
+
+ private void addOperatorInfo(Operator<? extends OperatorDesc> op) {
+ OpParseContext opCtx = opParseCtx.get(op);
+ if (op.getColumnExprMap() != null) {
+ if (!originalOpColumnExprMap.containsKey(op)) {
+ originalOpColumnExprMap.put(op, op.getColumnExprMap());
+ }
+ }
+ if (opCtx != null) {
+ if (!originalOpParseCtx.containsKey(op)) {
+ originalOpParseCtx.put(op, opCtx);
+ }
+ if (opCtx.getRowResolver() != null) {
+ if (!originalOpRowResolver.containsKey(op)) {
+ originalOpRowResolver.put(op, opCtx.getRowResolver());
+ }
+ }
+ }
+ }
+
+ private NodeProcessor getPhase1DefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ addOperatorInfo(op);
+
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ ((CorrelationNodePhase1ProcCtx)procCtx).fileSinkOperatorCount++;
+ }
+ return null;
+ }
+ };
+ }
+
+ private class CorrelationNodeProc implements NodeProcessor {
+
+ public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) {
+ Operator<? extends OperatorDesc> op = rsop.getChildOperators().get(0);
+ while (!op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ return null;
+ }
+ assert op.getChildOperators().size() <= 1;
+ op = op.getChildOperators().get(0);
+ }
+ return (ReduceSinkOperator) op;
+ }
+
+ private void analyzeReduceSinkOperatorsOfJoinOperator(JoinCondDesc[] joinConds,
+ List<Operator<? extends OperatorDesc>> rsOps, Operator<? extends OperatorDesc> curentRsOps,
+ Set<ReduceSinkOperator> correlatedRsOps) {
+ if (correlatedRsOps.contains((ReduceSinkOperator)curentRsOps)) {
+ return;
+ }
+
+ correlatedRsOps.add((ReduceSinkOperator)curentRsOps);
+
+ int pos = rsOps.indexOf(curentRsOps);
+ for (int i=0; i<joinConds.length; i++) {
+ JoinCondDesc joinCond = joinConds[i];
+ int type = joinCond.getType();
+ if (pos == joinCond.getLeft()) {
+ if (type == JoinDesc.INNER_JOIN || type == JoinDesc.LEFT_OUTER_JOIN) {
+ Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getRight());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ } else if (pos == joinCond.getRight()) {
+ if (type == JoinDesc.INNER_JOIN || type == JoinDesc.RIGHT_OUTER_JOIN) {
+ Operator<? extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getLeft());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ }
+ }
+ }
+
+ private Set<ReduceSinkOperator> findCorrelatedReduceSinkOperators(
+ Operator<? extends OperatorDesc> op, Set<String> keyColumns,
+ IntraQueryCorrelation correlation) throws SemanticException {
+
+ LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName());
+
+ Set<ReduceSinkOperator> correlatedReduceSinkOps = new HashSet<ReduceSinkOperator>();
+ if (op.getParentOperators() == null) {
+ return correlatedReduceSinkOps;
+ }
+ if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) {
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+ parent, keyColumns, correlation));
+ }
+ } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) {
+ Set<String> newKeyColumns = new HashSet<String>();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ if (op.getName().equals(CommonJoinOperator.getOperatorName())) {
+ Set<String> tableNeedToCheck = new HashSet<String>();
+ for (String keyColumn : keyColumns) {
+ for (ColumnInfo cinfo : originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) {
+ if (keyColumn.equals(cinfo.getInternalName())) {
+ tableNeedToCheck.add(cinfo.getTabAlias());
+ }
+ }
+ }
+ Set<ReduceSinkOperator> correlatedRsOps = new HashSet<ReduceSinkOperator>();
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ Set<String> tableNames =
+ originalOpParseCtx.get(parent).getRowResolver().getTableNames();
+ for (String tbl : tableNames) {
+ if (tableNeedToCheck.contains(tbl)) {
+ correlatedRsOps.addAll(findCorrelatedReduceSinkOperators(parent,
+ newKeyColumns, correlation));
+ }
+ }
+ }
+
+ // Right now, if any ReduceSinkOperator of this JoinOperator is not correlated, we will
+ // not optimize this query
+ if (correlatedRsOps.size() == op.getParentOperators().size()) {
+ correlatedReduceSinkOps.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOps.clear();
+ }
+ } else {
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+ parent, newKeyColumns, correlation));
+ }
+ }
+ } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) {
+ Set<String> newKeyColumns = new HashSet<String>();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ ReduceSinkOperator rsop = (ReduceSinkOperator) op;
+ Set<String> thisKeyColumns = new HashSet<String>();
+ for (ExprNodeDesc key : rsop.getConf().getKeyCols()) {
+ if (key instanceof ExprNodeColumnDesc) {
+ thisKeyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ boolean isCorrelated = false;
+ Set<String> intersection = new HashSet<String>(newKeyColumns);
+ intersection.retainAll(thisKeyColumns);
+ // TODO: should use if intersection is empty to evaluate if two corresponding operators are
+ // correlated
+ isCorrelated = (intersection.size() == thisKeyColumns.size() && !intersection.isEmpty());
+
+ ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop);
+ // Since we start the search from those reduceSinkOperator at bottom (near FileSinkOperator),
+ // we can always find a reduceSinkOperator at a lower level
+ assert nextChildReduceSinkOperator != null;
+ if (isCorrelated) {
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+ CommonJoinOperator.getOperatorName())) {
+ if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() ||
+ intersection.size() != rsop.getConf().getKeyCols().size()) {
+ // Right now, we can only handle identical join keys.
+ isCorrelated = false;
+ }
+ }
+ }
+
+ if (isCorrelated) {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ if (((Operator<? extends OperatorDesc>) (op.getChildOperators().get(0))).getName()
+ .equals(CommonJoinOperator.getOperatorName())) {
+ JoinOperator joinOp = (JoinOperator)op.getChildOperators().get(0);
+ JoinCondDesc[] joinConds = joinOp.getConf().getConds();
+ List<Operator<? extends OperatorDesc>> rsOps = joinOp.getParentOperators();
+ Set<ReduceSinkOperator> correlatedRsOps = new HashSet<ReduceSinkOperator>();
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, op, correlatedRsOps);
+ correlatedReduceSinkOps.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOps.add(rsop);
+ }
+ // this if block is useful when we use "isCorrelated = !(intersection.isEmpty());" for
+ // the evaluation of isCorrelated
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+ GroupByOperator.getOperatorName()) &&
+ (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) {
+ LOG.info("--found a RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns. "
+ + " The number of common keys is "
+ + intersection.size()
+ + ", and the number of keys of next group by operator"
+ + nextChildReduceSinkOperator.getConf().getKeyCols().size());
+ correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator);
+ }
+ } else {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ correlatedReduceSinkOps.clear();
+ correlation.getRSGBYToBeReplacedByGBYRSGBY().clear();
+ }
+ } else {
+ LOG.error("ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap");
+ throw new SemanticException("CorrelationOptimizer cannot optimize this plan. " +
+ "ReduceSinkOperator " + op.getIdentifier()
+ + " does not have ColumnExprMap");
+ }
+ return correlatedReduceSinkOps;
+ }
+
+ private Set<ReduceSinkOperator> exploitJFC(ReduceSinkOperator op,
+ CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation)
+ throws SemanticException {
+
+ correlationCtx.addWalked(op);
+ correlation.addToAllReduceSinkOperators(op);
+
+ Set<ReduceSinkOperator> reduceSinkOperators = new HashSet<ReduceSinkOperator>();
+
+ boolean shouldDetect = true;
+
+ List<ExprNodeDesc> keys = op.getConf().getKeyCols();
+ Set<String> keyColumns = new HashSet<String>();
+ for (ExprNodeDesc key : keys) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ shouldDetect = false;
+ } else {
+ keyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ if (shouldDetect) {
+ Set<ReduceSinkOperator> newReduceSinkOperators = new HashSet<ReduceSinkOperator>();
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ LOG.info("Operator " + op.getIdentifier()
+ + ": start detecting correlation from this operator");
+ LOG.info("--keys of this operator: " + keyColumns.toString());
+ Set<ReduceSinkOperator> correlatedReduceSinkOperators =
+ findCorrelatedReduceSinkOperators(parent, keyColumns, correlation);
+ if (correlatedReduceSinkOperators.size() == 0) {
+ newReduceSinkOperators.add(op);
+ } else {
+ for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+
+ // For two ReduceSinkOperators, we say the one closer to FileSinkOperators is up and
+ // another one is down
+
+ if (!correlation.getUp2downRSops().containsKey(op)) {
+ correlation.getUp2downRSops().put(op, new ArrayList<ReduceSinkOperator>());
+ }
+ correlation.getUp2downRSops().get(op).add(rsop);
+
+ if (!correlation.getDown2upRSops().containsKey(rsop)) {
+ correlation.getDown2upRSops().put(rsop, new ArrayList<ReduceSinkOperator>());
+ }
+ correlation.getDown2upRSops().get(rsop).add(op);
+ Set<ReduceSinkOperator> exploited = exploitJFC(rsop, correlationCtx,
+ correlation);
+ if (exploited.size() == 0) {
+ newReduceSinkOperators.add(rsop);
+ } else {
+ newReduceSinkOperators.addAll(exploited);
+ }
+ }
+ }
+ }
+ reduceSinkOperators.addAll(newReduceSinkOperators);
+ }
+ return reduceSinkOperators;
+ }
+
+ private TableScanOperator findTableScanOPerator(Operator<? extends OperatorDesc> startPoint) {
+ Operator<? extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+ while (true) {
+ if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ return null;
+ } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+ return (TableScanOperator) thisOp;
+ } else {
+ if (thisOp.getParentOperators() != null) {
+ thisOp = thisOp.getParentOperators().get(0);
+ } else {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void annotateOpPlan(IntraQueryCorrelation correlation) {
+ Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPath =
+ new HashMap<ReduceSinkOperator, Integer>();
+ int indx = 0;
+ for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+ if (!bottomReduceSink2OperationPath.containsKey(rsop)) {
+ bottomReduceSink2OperationPath.put(rsop, indx);
+ for (ReduceSinkOperator peerRSop : CorrelationOptimizerUtils
+ .findPeerReduceSinkOperators(rsop)) {
+ if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) {
+ bottomReduceSink2OperationPath.put(peerRSop, indx);
+ }
+ }
+ indx++;
+ }
+ }
+ correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OperationPath);
+ }
+
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+ if (correlationCtx.isWalked(op)) {
+ return null;
+ }
+
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName());
+ addOperatorInfo((Operator<? extends OperatorDesc>) nd);
+
+ if (op.getConf().getKeyCols().size() == 0 ||
+ (!op.getChildOperators().get(0).getName().equals(CommonJoinOperator.getOperatorName()) &&
+ !op.getChildOperators().get(0).getName().equals(GroupByOperator.getOperatorName()))) {
+ correlationCtx.addWalked(op);
+ return null;
+ }
+
+ // 1: find out correlation
+ IntraQueryCorrelation correlation = new IntraQueryCorrelation();
+ List<ReduceSinkOperator> peerReduceSinkOperators =
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(op);
+ List<ReduceSinkOperator> bottomReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+ for (ReduceSinkOperator rsop : peerReduceSinkOperators) {
+ Set<ReduceSinkOperator> thisBottomReduceSinkOperators = exploitJFC(rsop,
+ correlationCtx, correlation);
+ if (thisBottomReduceSinkOperators.size() == 0) {
+ thisBottomReduceSinkOperators.add(rsop);
+ } else {
+ boolean isClear = false;
+ // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // TableScanOperators
+ for (ReduceSinkOperator bottomRsop : thisBottomReduceSinkOperators) {
+ TableScanOperator tsop = findTableScanOPerator(bottomRsop);
+ if (tsop == null) {
+ isClear = true; // currently the optimizer can only optimize correlations involving
+ // source tables (input tables)
+ } else {
+ // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // FileSinkOperators
+ if (!correlation.getTop2TSops().containsKey(rsop)) {
+ correlation.getTop2TSops().put(rsop, new ArrayList<TableScanOperator>());
+ }
+ correlation.getTop2TSops().get(rsop).add(tsop);
+
+ if (!correlation.getBottom2TSops().containsKey(bottomRsop)) {
+ correlation.getBottom2TSops().put(bottomRsop, new ArrayList<TableScanOperator>());
+ }
+ correlation.getBottom2TSops().get(bottomRsop).add(tsop);
+ }
+ }
+ if (isClear) {
+ thisBottomReduceSinkOperators.clear();
+ thisBottomReduceSinkOperators.add(rsop);
+ }
+ }
+ bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+ }
+
+ if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+ LOG.info("has job flow correlation");
+ correlation.setJobFlowCorrelation(true);
+ correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators);
+ annotateOpPlan(correlation);
+ }
+
+ if (correlation.hasJobFlowCorrelation()) {
+ boolean hasICandTC = findICandTC(correlation);
+ LOG.info("has input correlation and transit correlation? " + hasICandTC);
+ correlation.setInputCorrelation(hasICandTC);
+ correlation.setTransitCorrelation(hasICandTC);
+ boolean hasSelfJoin = hasSelfJoin(correlation);
+ LOG.info("has self-join? " + hasSelfJoin);
+ correlation.setInvolveSelfJoin(hasSelfJoin);
+ // TODO: support self-join involved cases. For self-join related operation paths, after the
+ // correlation dispatch operator, each path should be filtered by a filter operator
+ if (!hasSelfJoin) {
+ LOG.info("correlation detected");
+ correlationCtx.addCorrelation(correlation);
+ } else {
+ LOG.info("correlation discarded. The current optimizer cannot optimize it");
+ }
+ }
+ correlationCtx.addWalkedAll(peerReduceSinkOperators);
+ return null;
+ }
+
+ private boolean hasSelfJoin(IntraQueryCorrelation correlation) {
+ boolean hasSelfJoin = false;
+ for (Entry<String, List<ReduceSinkOperator>> entry : correlation
+ .getTable2CorrelatedRSops().entrySet()) {
+ for (ReduceSinkOperator rsop : entry.getValue()) {
+ Set<ReduceSinkOperator> intersection = new HashSet<ReduceSinkOperator>(
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop));
+ intersection.retainAll(entry.getValue());
+ // if self-join is involved
+ if (intersection.size() > 1) {
+ hasSelfJoin = true;
+ return hasSelfJoin;
+ }
+ }
+ }
+ return hasSelfJoin;
+ }
+
+ private boolean findICandTC(IntraQueryCorrelation correlation) {
+
+ boolean hasICandTC = false;
+ Map<String, List<ReduceSinkOperator>> table2RSops =
+ new HashMap<String, List<ReduceSinkOperator>>();
+ Map<String, List<TableScanOperator>> table2TSops =
+ new HashMap<String, List<TableScanOperator>>();
+
+ for (Entry<ReduceSinkOperator, List<TableScanOperator>> entry : correlation
+ .getBottom2TSops().entrySet()) {
+ String tbl = aliastoTabName.get(entry.getValue().get(0).getConf().getAlias());
+ if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) {
+ table2RSops.put(tbl, new ArrayList<ReduceSinkOperator>());
+ table2TSops.put(tbl, new ArrayList<TableScanOperator>());
+ }
+ assert entry.getValue().size() == 1;
+ table2RSops.get(tbl).add(entry.getKey());
+ table2TSops.get(tbl).add(entry.getValue().get(0));
+ }
+
+ for (Entry<String, List<ReduceSinkOperator>> entry : table2RSops.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ hasICandTC = true;
+ break;
+ }
+ }
+ correlation.setICandTCCorrelation(table2RSops, table2TSops);
+ return hasICandTC;
+ }
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName() + ". No actual work to do");
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ if (op.getName().equals(MapJoinOperator.getOperatorName())) {
+ correlationCtx.setAbort(true);
+ correlationCtx.getAbortReasons().add("Found MAPJOIN");
+ }
+ addOperatorInfo((Operator<? extends OperatorDesc>) nd);
+ return null;
+ }
+ };
+ }
+
+ public class IntraQueryCorrelation {
+
+ private final Map<ReduceSinkOperator, List<ReduceSinkOperator>> down2upRSops =
+ new HashMap<ReduceSinkOperator, List<ReduceSinkOperator>>();
+ private final Map<ReduceSinkOperator, List<ReduceSinkOperator>> up2downRSops =
+ new HashMap<ReduceSinkOperator, List<ReduceSinkOperator>>();
+
+ private final Map<ReduceSinkOperator, List<TableScanOperator>> top2TSops =
+ new HashMap<ReduceSinkOperator, List<TableScanOperator>>();
+ private final Map<ReduceSinkOperator, List<TableScanOperator>> bottom2TSops =
+ new HashMap<ReduceSinkOperator, List<TableScanOperator>>();
+
+ private List<ReduceSinkOperator> topReduceSinkOperators;
+ private List<ReduceSinkOperator> bottomReduceSinkOperators;
+
+ private Map<String, List<ReduceSinkOperator>> table2CorrelatedRSops;
+
+ private Map<String, List<TableScanOperator>> table2CorrelatedTSops;
+
+ private Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPathMap;
+
+ private final Map<Integer, Map<Integer, List<Integer>>> dispatchConf =
+ new HashMap<Integer, Map<Integer, List<Integer>>>(); // inputTag->(Child->outputTag)
+ private final Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf =
+ new HashMap<Integer, Map<Integer, List<SelectDesc>>>(); // inputTag->(Child->SelectDesc)
+ private final Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf =
+ new HashMap<Integer, Map<Integer, List<SelectDesc>>>(); // inputTag->(Child->SelectDesc)
+
+ private final Set<ReduceSinkOperator> allReduceSinkOperators =
+ new HashSet<ReduceSinkOperator>();
+
+ // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by
+ // GroupBy-ReduceSink-GroupBy pattern.
+ // the type of first GroupByOperator is hash type and this one will be used to group records.
+ private final Set<ReduceSinkOperator> rSGBYToBeReplacedByGBYRSGBY =
+ new HashSet<ReduceSinkOperator>();
+
+ public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) {
+ rSGBYToBeReplacedByGBYRSGBY.add(rsop);
+ }
+
+ public Set<ReduceSinkOperator> getRSGBYToBeReplacedByGBYRSGBY() {
+ return rSGBYToBeReplacedByGBYRSGBY;
+ }
+
+ public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+ allReduceSinkOperators.add(rsop);
+ }
+
+ public Set<ReduceSinkOperator> getAllReduceSinkOperators() {
+ return allReduceSinkOperators;
+ }
+
+ public Map<Integer, Map<Integer, List<Integer>>> getDispatchConf() {
+ return dispatchConf;
+ }
+
+ public Map<Integer, Map<Integer, List<SelectDesc>>> getDispatchValueSelectDescConf() {
+ return dispatchValueSelectDescConf;
+ }
+
+ public Map<Integer, Map<Integer, List<SelectDesc>>> getDispatchKeySelectDescConf() {
+ return dispatchKeySelectDescConf;
+ }
+
+ public void addOperationPathToDispatchConf(Integer opPlan) {
+ if (!dispatchConf.containsKey(opPlan)) {
+ dispatchConf.put(opPlan, new HashMap<Integer, List<Integer>>());
+ }
+ }
+
+ public Map<Integer, List<Integer>> getDispatchConfForOperationPath(Integer opPlan) {
+ return dispatchConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) {
+ if (!dispatchValueSelectDescConf.containsKey(opPlan)) {
+ dispatchValueSelectDescConf.put(opPlan, new HashMap<Integer, List<SelectDesc>>());
+ }
+ }
+
+ public Map<Integer, List<SelectDesc>> getDispatchValueSelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchValueSelectDescConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) {
+ if (!dispatchKeySelectDescConf.containsKey(opPlan)) {
+ dispatchKeySelectDescConf.put(opPlan, new HashMap<Integer, List<SelectDesc>>());
+ }
+ }
+
+ public Map<Integer, List<SelectDesc>> getDispatchKeySelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchKeySelectDescConf.get(opPlan);
+ }
+
+ private boolean inputCorrelation = false;
+ private boolean transitCorrelation = false;
+ private boolean jobFlowCorrelation = false;
+
+ public void setBottomReduceSink2OperationPathMap(
+ Map<ReduceSinkOperator, Integer> bottomReduceSink2OperationPathMap) {
+ this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap;
+ }
+
+ public Map<ReduceSinkOperator, Integer> getBottomReduceSink2OperationPathMap() {
+ return bottomReduceSink2OperationPathMap;
+ }
+
+ public void setInputCorrelation(boolean inputCorrelation) {
+ this.inputCorrelation = inputCorrelation;
+ }
+
+ public boolean hasInputCorrelation() {
+ return inputCorrelation;
+ }
+
+ public void setTransitCorrelation(boolean transitCorrelation) {
+ this.transitCorrelation = transitCorrelation;
+ }
+
+ public boolean hasTransitCorrelation() {
+ return transitCorrelation;
+ }
+
+ public void setJobFlowCorrelation(boolean jobFlowCorrelation) {
+ this.jobFlowCorrelation = jobFlowCorrelation;
+ }
+
+ public boolean hasJobFlowCorrelation() {
+ return jobFlowCorrelation;
+ }
+
+ public Map<ReduceSinkOperator, List<TableScanOperator>> getTop2TSops() {
+ return top2TSops;
+ }
+
+ public Map<ReduceSinkOperator, List<TableScanOperator>> getBottom2TSops() {
+ return bottom2TSops;
+ }
+
+ public Map<ReduceSinkOperator, List<ReduceSinkOperator>> getDown2upRSops() {
+ return down2upRSops;
+ }
+
+ public Map<ReduceSinkOperator, List<ReduceSinkOperator>> getUp2downRSops() {
+ return up2downRSops;
+ }
+
+ public void setJFCCorrelation(List<ReduceSinkOperator> peerReduceSinkOperators,
+ List<ReduceSinkOperator> bottomReduceSinkOperators) {
+ this.topReduceSinkOperators = peerReduceSinkOperators;
+ this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+ }
+
+
+ public List<ReduceSinkOperator> getTopReduceSinkOperators() {
+ return topReduceSinkOperators;
+ }
+
+ public List<ReduceSinkOperator> getBottomReduceSinkOperators() {
+ return bottomReduceSinkOperators;
+ }
+
+ public void setICandTCCorrelation(Map<String, List<ReduceSinkOperator>> table2RSops,
+ Map<String, List<TableScanOperator>> table2TSops) {
+ this.table2CorrelatedRSops = table2RSops;
+ this.table2CorrelatedTSops = table2TSops;
+ }
+
+ public Map<String, List<ReduceSinkOperator>> getTable2CorrelatedRSops() {
+ return table2CorrelatedRSops;
+ }
+
+ public Map<String, List<TableScanOperator>> getTable2CorrelatedTSops() {
+ return table2CorrelatedTSops;
+ }
+
+ private boolean isInvolveSelfJoin = false;
+
+ public boolean isInvolveSelfJoin() {
+ return isInvolveSelfJoin;
+ }
+
+ public void setInvolveSelfJoin(boolean isInvolveSelfJoin) {
+ this.isInvolveSelfJoin = isInvolveSelfJoin;
+ }
+
+ }
+
+ private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx {
+ public int fileSinkOperatorCount = 0;
+ }
+
+ private class CorrelationNodeProcCtx implements NodeProcessorCtx {
+
+ private boolean abort;
+
+ private final List<String> abortReasons;
+
+ private final Set<ReduceSinkOperator> walked;
+
+ private final List<IntraQueryCorrelation> correlations;
+
+ public CorrelationNodeProcCtx() {
+ walked = new HashSet<ReduceSinkOperator>();
+ correlations = new ArrayList<IntraQueryCorrelation>();
+ abort = false;
+ abortReasons = new ArrayList<String>();
+ }
+
+ public void setAbort(boolean abort) {
+ this.abort = abort;
+ }
+
+ public boolean isAbort() {
+ return abort;
+ }
+
+ public List<String> getAbortReasons() {
+ return abortReasons;
+ }
+
+ public void addCorrelation(IntraQueryCorrelation correlation) {
+ correlations.add(correlation);
+ }
+
+ public List<IntraQueryCorrelation> getCorrelations() {
+ return correlations;
+ }
+
+ public boolean isWalked(ReduceSinkOperator op) {
+ return walked.contains(op);
+ }
+
+ public void addWalked(ReduceSinkOperator op) {
+ walked.add(op);
+ }
+
+ public void addWalkedAll(Collection<ReduceSinkOperator> c) {
+ walked.addAll(c);
+ }
+
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,801 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator;
+import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.ForwardDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+
+public final class CorrelationOptimizerUtils {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName());
+
+ public static boolean isExisted(ExprNodeDesc expr, List<ExprNodeDesc> col_list) {
+ for (ExprNodeDesc thisExpr : col_list) {
+ if (expr.getExprString().equals(thisExpr.getExprString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getColumnName(Map<String, ExprNodeDesc> opColumnExprMap, ExprNodeDesc expr) {
+ for (Entry<String, ExprNodeDesc> entry : opColumnExprMap.entrySet()) {
+ if (expr.getExprString().equals(entry.getValue().getExprString())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+
+ public static Operator<? extends OperatorDesc> unionUsedColumnsAndMakeNewSelect(
+ List<ReduceSinkOperator> rsops,
+ IntraQueryCorrelation correlation, Map<Operator<? extends OperatorDesc>,
+ Map<String, ExprNodeDesc>> originalOpColumnExprMap, TableScanOperator input,
+ ParseContext pGraphContext,
+ Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx) {
+
+ ArrayList<String> columnNames = new ArrayList<String>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> col_list = new ArrayList<ExprNodeDesc>();
+ RowResolver out_rwsch = new RowResolver();
+ boolean isSelectAll = false;
+
+ int pos = 0;
+ for (ReduceSinkOperator rsop : rsops) {
+ Operator<? extends OperatorDesc> curr = correlation.getBottom2TSops().get(rsop).get(0)
+ .getChildOperators().get(0);
+ while (true) {
+ if (curr.getName().equals(SelectOperator.getOperatorName())) {
+ SelectOperator selOp = (SelectOperator) curr;
+ if (selOp.getColumnExprMap() != null) {
+ for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
+ ExprNodeDesc expr = entry.getValue();
+ if (!isExisted(expr, col_list)
+ && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+ .containsKey(entry.getKey())) {
+ col_list.add(expr);
+ String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+ .get(entry.getKey());
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = entry.getKey();
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ pos++;
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ }
+ }
+ } else {
+ for (ExprNodeDesc expr : selOp.getConf().getColList()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver()
+ .getInvRslvMap().get(expr.getCols().get(0));
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ }
+ break;
+ } else if (curr.getName().equals(FilterOperator.getOperatorName())) {
+ isSelectAll = true;
+ break;
+ } else if (curr.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ ReduceSinkOperator thisRSop = (ReduceSinkOperator) curr;
+ for (ExprNodeDesc expr : thisRSop.getConf().getKeyCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+ .getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ for (ExprNodeDesc expr : thisRSop.getConf().getValueCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+ .getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+
+ break;
+ } else {
+ curr = curr.getChildOperators().get(0);
+ }
+ }
+ }
+
+ Operator<? extends OperatorDesc> output;
+ if (isSelectAll) {
+ output = input;
+ } else {
+ output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(col_list, columnNames, false), new RowSchema(
+ out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx());
+ output.setColumnExprMap(colExprMap);
+ output.setChildOperators(Utilities.makeList());
+
+ }
+
+ return output;
+ }
+
+
+ public static Operator<? extends OperatorDesc> putOpInsertMap(
+ Operator<? extends OperatorDesc> op,
+ RowResolver rr, LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {
+ OpParseContext ctx = new OpParseContext(rr);
+ opParseCtx.put(op, ctx);
+ op.augmentPlan();
+ return op;
+ }
+
+ public static Map<Operator<? extends OperatorDesc>, String> getAliasIDtTopOps(
+ Map<String, Operator<? extends OperatorDesc>> topOps) {
+ Map<Operator<? extends OperatorDesc>, String> aliasIDtTopOps =
+ new HashMap<Operator<? extends OperatorDesc>, String>();
+ for (Entry<String, Operator<? extends OperatorDesc>> entry : topOps.entrySet()) {
+ assert !aliasIDtTopOps.containsKey(entry.getValue());
+ aliasIDtTopOps.put(entry.getValue(), entry.getKey());
+ }
+ return aliasIDtTopOps;
+ }
+
+ /**
+ * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op
+ * included).
+ */
+ public static List<ReduceSinkOperator> findPeerReduceSinkOperators(ReduceSinkOperator op) {
+ List<ReduceSinkOperator> peerReduceSinkOperators = new ArrayList<ReduceSinkOperator>();
+ List<Operator<? extends OperatorDesc>> children = op.getChildOperators();
+ assert children.size() == 1; // A ReduceSinkOperator should have only one child
+ for (Operator<? extends OperatorDesc> parent : children.get(0).getParentOperators()) {
+ assert (parent instanceof ReduceSinkOperator);
+ peerReduceSinkOperators.add((ReduceSinkOperator) parent);
+ }
+ return peerReduceSinkOperators;
+ }
+
+ public static List<CorrelationLocalSimulativeReduceSinkOperator> findPeerFakeReduceSinkOperators(
+ CorrelationLocalSimulativeReduceSinkOperator op) {
+
+ List<CorrelationLocalSimulativeReduceSinkOperator> peerReduceSinkOperators =
+ new ArrayList<CorrelationLocalSimulativeReduceSinkOperator>();
+
+ List<Operator<? extends OperatorDesc>> children = op.getChildOperators();
+ assert children.size() == 1;
+
+ for (Operator<? extends OperatorDesc> parent : children.get(0).getParentOperators()) {
+ assert (parent instanceof ReduceSinkOperator);
+ peerReduceSinkOperators.add((CorrelationLocalSimulativeReduceSinkOperator) parent);
+ }
+
+ return peerReduceSinkOperators;
+ }
+
+ public static boolean applyCorrelation(
+ IntraQueryCorrelation correlation,
+ ParseContext inputpGraphContext,
+ Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap,
+ Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver,
+ Map<ReduceSinkOperator, GroupByOperator> groupbyRegular2MapSide,
+ Map<Operator<? extends OperatorDesc>, OpParseContext> originalOpParseCtx)
+ throws SemanticException {
+
+ ParseContext pGraphContext = inputpGraphContext;
+
+ // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the type of the first GBY is
+ // hash, so it can group records
+ LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY");
+ for (ReduceSinkOperator rsop : correlation.getRSGBYToBeReplacedByGBYRSGBY()) {
+ LOG.info("operator " + rsop.getIdentifier() + " should be replaced");
+ assert !correlation.getBottomReduceSinkOperators().contains(rsop);
+ GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop);
+ assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator);
+ ReduceSinkOperator newRsop = (ReduceSinkOperator) mapSideGBY.getChildOperators().get(0);
+ GroupByOperator reduceSideGBY = (GroupByOperator) newRsop.getChildOperators().get(0);
+ GroupByOperator oldReduceSideGBY = (GroupByOperator) rsop.getChildOperators().get(0);
+ List<Operator<? extends OperatorDesc>> parents = rsop.getParentOperators();
+ List<Operator<? extends OperatorDesc>> children = oldReduceSideGBY.getChildOperators();
+ mapSideGBY.setParentOperators(parents);
+ for (Operator<? extends OperatorDesc> parent : parents) {
+ parent.replaceChild(rsop, mapSideGBY);
+ }
+ reduceSideGBY.setChildOperators(children);
+ for (Operator<? extends OperatorDesc> child : children) {
+ child.replaceParent(oldReduceSideGBY, reduceSideGBY);
+ }
+ correlation.getAllReduceSinkOperators().remove(rsop);
+ correlation.getAllReduceSinkOperators().add(newRsop);
+ }
+
+
+ Operator<? extends OperatorDesc> curr;
+
+ // 1: Create table scan operator
+ LOG.info("apply correlation step 1: create table scan operator");
+ Map<TableScanOperator, TableScanOperator> oldTSOP2newTSOP =
+ new HashMap<TableScanOperator, TableScanOperator>();
+ Map<String, Operator<? extends OperatorDesc>> oldTopOps = pGraphContext.getTopOps();
+ Map<Operator<? extends OperatorDesc>, String> oldAliasIDtTopOps =
+ getAliasIDtTopOps(oldTopOps);
+ Map<TableScanOperator, Table> oldTopToTable = pGraphContext.getTopToTable();
+ Map<String, Operator<? extends OperatorDesc>> addedTopOps =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+ Map<TableScanOperator, Table> addedTopToTable = new HashMap<TableScanOperator, Table>();
+ for (Entry<String, List<TableScanOperator>> entry : correlation.getTable2CorrelatedTSops()
+ .entrySet()) {
+ TableScanOperator oldTSop = entry.getValue().get(0);
+ TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf()
+ .getVirtualCols());
+ tsDesc.setForwardRowNumber(true);
+ OpParseContext opParseCtx = pGraphContext.getOpParseCtx().get(oldTSop);
+ Operator<? extends OperatorDesc> top = putOpInsertMap(OperatorFactory.get(tsDesc,
+ new RowSchema(opParseCtx.getRowResolver().getColumnInfos())),
+ opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx());
+ top.setParentOperators(null);
+ top.setChildOperators(Utilities.makeList());
+ for (TableScanOperator tsop : entry.getValue()) {
+ addedTopOps.put(oldAliasIDtTopOps.get(tsop), top);
+ addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop));
+ oldTSOP2newTSOP.put(tsop, (TableScanOperator) top);
+ }
+ }
+
+ List<Operator<? extends OperatorDesc>> childrenOfDispatch =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+ // TODO: currently, correlation optimizer can not handle the case that
+ // a table is directly connected to a post computation operator. e.g.
+ // Join
+ // / \
+ // GBY T2
+ // |
+ // T1
+ if (!correlation.getBottomReduceSinkOperators()
+ .containsAll(findPeerReduceSinkOperators(rsop))) {
+ LOG.info("Can not handle the case that " +
+ "a table is directly connected to a post computation operator. Use original plan");
+ return false;
+ }
+ Operator<? extends OperatorDesc> op = rsop.getChildOperators().get(0);
+ if (!childrenOfDispatch.contains(op)) {
+ LOG.info("Add :" + op.getIdentifier() + " " + op.getName()
+ + " to the children list of dispatch operator");
+ childrenOfDispatch.add(op);
+ }
+ }
+
+ int opTag = 0;
+ Map<Integer, ReduceSinkOperator> operationPath2CorrelationReduceSinkOps =
+ new HashMap<Integer, ReduceSinkOperator>();
+ for (Entry<String, List<ReduceSinkOperator>> entry : correlation
+ .getTable2CorrelatedRSops().entrySet()) {
+
+ // 2: Create select operator for shared operation paths
+ LOG.info("apply correlation step 2: create select operator for shared operation path for the table of "
+ + entry.getKey());
+ curr =
+ unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap,
+ oldTSOP2newTSOP
+ .get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)),
+ pGraphContext, originalOpParseCtx);
+
+ // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator
+ LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of "
+ + entry.getKey());
+ curr =
+ createCorrelationCompositeReducesinkOperaotr(
+ correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(),
+ correlation, curr, pGraphContext,
+ childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag,
+ originalOpRowResolver);
+
+ operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator) curr);
+ opTag++;
+ }
+
+
+ // 4: Create correlation dispatch operator for operation paths
+ LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths");
+ RowResolver outputRS = new RowResolver();
+ List<Operator<? extends OperatorDesc>> correlationReduceSinkOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Entry<Integer, ReduceSinkOperator> entry : operationPath2CorrelationReduceSinkOps
+ .entrySet()) {
+ curr = entry.getValue();
+ correlationReduceSinkOps.add(curr);
+ RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver();
+ for (Entry<String, LinkedHashMap<String, ColumnInfo>> e1 : inputRS.getRslvMap().entrySet()) {
+ for (Entry<String, ColumnInfo> e2 : e1.getValue().entrySet()) {
+ outputRS.put(e1.getKey(), e2.getKey(), e2.getValue());
+ }
+ }
+ }
+
+ Operator<? extends OperatorDesc> dispatchOp = putOpInsertMap(OperatorFactory.get(
+ new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation
+ .getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()),
+ new RowSchema(outputRS.getColumnInfos())),
+ outputRS, pGraphContext.getOpParseCtx());
+
+ dispatchOp.setParentOperators(correlationReduceSinkOps);
+ for (Operator<? extends OperatorDesc> thisOp : correlationReduceSinkOps) {
+ thisOp.setChildOperators(Utilities.makeList(dispatchOp));
+ }
+
+ // 5: Replace the old plan in the original plan tree with new plan
+ LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan");
+ Set<Operator<? extends OperatorDesc>> processed =
+ new HashSet<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> op : childrenOfDispatch) {
+ List<Operator<? extends OperatorDesc>> parents =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> oldParent : op.getParentOperators()) {
+ if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) {
+ parents.add(oldParent);
+ }
+ }
+ parents.add(dispatchOp);
+ op.setParentOperators(parents);
+ }
+ dispatchOp.setChildOperators(childrenOfDispatch);
+ HashMap<String, Operator<? extends OperatorDesc>> newTopOps =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+ for (Entry<String, Operator<? extends OperatorDesc>> entry : oldTopOps.entrySet()) {
+ if (addedTopOps.containsKey(entry.getKey())) {
+ newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey()));
+ } else {
+ newTopOps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ pGraphContext.setTopOps(newTopOps);
+ HashMap<TableScanOperator, Table> newTopToTable = new HashMap<TableScanOperator, Table>();
+ for (Entry<TableScanOperator, Table> entry : oldTopToTable.entrySet()) {
+ if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) {
+ newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()),
+ addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey())));
+ } else {
+ newTopToTable.put(entry.getKey(), entry.getValue());
+ }
+ }
+ pGraphContext.setTopToTable(newTopToTable);
+
+ // 6: Change every JFC related ReduceSinkOperator to a
+ // CorrelationLocalSimulativeReduceSinkOperator
+ LOG.info("apply correlation step 6: Change every JFC related reduce sink operator to a " +
+ "CorrelationLocalSimulativeReduceSinkOperator");
+ for (ReduceSinkOperator rsop : correlation.getAllReduceSinkOperators()) {
+ if (!correlation.getBottomReduceSinkOperators().contains(rsop)) {
+ Operator<? extends OperatorDesc> childOP = rsop.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> parentOP = rsop.getParentOperators().get(0);
+ Operator<? extends OperatorDesc> correlationLocalSimulativeReduceSinkOperator =
+ putOpInsertMap(
+ OperatorFactory.get(
+ new CorrelationLocalSimulativeReduceSinkDesc(rsop.getConf()),
+ new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver()
+ .getColumnInfos())),
+ pGraphContext.getOpParseCtx().get(rsop).getRowResolver(),
+ pGraphContext.getOpParseCtx());
+ correlationLocalSimulativeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP));
+ correlationLocalSimulativeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP));
+ parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop),
+ correlationLocalSimulativeReduceSinkOperator);
+ childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop),
+ correlationLocalSimulativeReduceSinkOperator);
+ }
+ }
+ return true;
+ }
+
+ public static Operator<? extends OperatorDesc> createCorrelationCompositeReducesinkOperaotr(
+ List<TableScanOperator> tsops,
+ List<ReduceSinkOperator> rsops,
+ IntraQueryCorrelation correlation,
+ Operator<? extends OperatorDesc> input,
+ ParseContext pGraphContext,
+ List<Operator<? extends OperatorDesc>> childrenOfDispatch,
+ String tableName,
+ Map<Operator<? extends OperatorDesc>, Map<String, ExprNodeDesc>> originalOpColumnExprMap,
+ int newTag,
+ Map<Operator<? extends OperatorDesc>, RowResolver> originalOpRowResolver)
+ throws SemanticException {
+
+ // Create CorrelationCompositeOperator
+ RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+ List<Operator<? extends OperatorDesc>> tops =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ List<Operator<? extends OperatorDesc>> bottoms =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ List<Integer> opTags = new ArrayList<Integer>();
+
+ for (ReduceSinkOperator rsop : rsops) {
+ TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0);
+ Operator<? extends OperatorDesc> curr = tsop.getChildOperators().get(0);
+ if (curr == rsop) {
+ // no filter needed, just forward
+ ForwardDesc forwardCtx = new ForwardDesc();
+ Operator<ForwardDesc> forwardOp = OperatorFactory.get(ForwardDesc.class);
+ forwardOp.setConf(forwardCtx);
+ tops.add(forwardOp);
+ bottoms.add(forwardOp);
+ opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+ } else {
+ // Add filter operator
+ FilterOperator currFilOp = null;
+ while (curr != rsop) {
+ if (curr.getName().equals("FIL")) {
+ FilterOperator fil = (FilterOperator) curr;
+ FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false);
+ Operator<FilterDesc> nowFilOp = OperatorFactory.get(FilterDesc.class);
+ nowFilOp.setConf(filterCtx);
+ if (currFilOp == null) {
+ currFilOp = (FilterOperator) nowFilOp;
+ tops.add(currFilOp);
+ } else {
+ nowFilOp.setParentOperators(Utilities.makeList(currFilOp));
+ currFilOp.setChildOperators(Utilities.makeList(nowFilOp));
+ currFilOp = (FilterOperator) nowFilOp;
+ }
+ }
+ curr = curr.getChildOperators().get(0);
+ }
+ if (currFilOp == null) {
+ ForwardDesc forwardCtx = new ForwardDesc();
+ Operator<ForwardDesc> forwardOp = OperatorFactory.get(ForwardDesc.class);
+ forwardOp.setConf(forwardCtx);
+ tops.add(forwardOp);
+ bottoms.add(forwardOp);
+ } else {
+ bottoms.add(currFilOp);
+ }
+ opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+
+ }
+ }
+
+ int[] opTagsArray = new int[opTags.size()];
+ for (int i = 0; i < opTags.size(); i++) {
+ opTagsArray[i] = opTags.get(i).intValue();
+ }
+
+ for (Operator<? extends OperatorDesc> op : bottoms) {
+ op.setParentOperators(Utilities.makeList(input));
+ }
+ input.setChildOperators(bottoms);
+
+ CorrelationCompositeDesc ycoCtx = new CorrelationCompositeDesc();
+ ycoCtx.setAllOperationPathTags(opTagsArray);
+
+ Operator<? extends OperatorDesc> ycop = putOpInsertMap(OperatorFactory.get(ycoCtx,
+ new RowSchema(inputRR.getColumnInfos())),
+ inputRR, pGraphContext.getOpParseCtx());
+ ycop.setParentOperators(tops);
+ for (Operator<? extends OperatorDesc> op : tops) {
+ op.setChildOperators(Utilities.makeList(ycop));
+ }
+
+ // Create CorrelationReduceSinkOperator
+ ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ ArrayList<String> keyOutputColumnNames = new ArrayList<String>();
+ ReduceSinkOperator firstRsop = rsops.get(0);
+
+ RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver();
+ RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop);
+ RowResolver outputRS = new RowResolver();
+ Map<String, ExprNodeColumnDesc> keyCol2ExprForDispatch =
+ new HashMap<String, ExprNodeColumnDesc>();
+ Map<String, ExprNodeColumnDesc> valueCol2ExprForDispatch =
+ new HashMap<String, ExprNodeColumnDesc>();
+
+ for (ExprNodeDesc expr : firstRsop.getConf().getKeyCols()) {
+ assert expr instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr;
+ String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr);
+ ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(
+ orginalFirstRsopRS.getPosition(ouputName));
+
+ String col = SemanticAnalyzer.getColumnInternalName(keyCols.size());
+ keyOutputColumnNames.add(col);
+ ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+ .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+
+ colExprMap.put(newColInfo.getInternalName(), expr);
+
+ outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+ keyCols.add(expr);
+
+ keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col,
+ tableName,
+ encd.getIsPartitionColOrVirtualCol()));
+
+ }
+
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> valueOutputColumnNames = new ArrayList<String>();
+
+ correlation.addOperationPathToDispatchConf(newTag);
+ correlation.addOperationPathToDispatchKeySelectDescConf(newTag);
+ correlation.addOperationPathToDispatchValueSelectDescConf(newTag);
+
+
+ for (ReduceSinkOperator rsop : rsops) {
+ LOG.debug("Analyzing ReduceSinkOperator " + rsop.getIdentifier());
+ RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver();
+ RowResolver orginalRS = originalOpRowResolver.get(rsop);
+ Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0));
+ int outputTag = rsop.getConf().getTag();
+ if (outputTag == -1) {
+ outputTag = 0;
+ }
+ if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) {
+ correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex,
+ new ArrayList<Integer>());
+ }
+ correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag);
+
+ ArrayList<ExprNodeDesc> thisKeyColsInDispatch = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> outputKeyNamesInDispatch = new ArrayList<String>();
+ for (ExprNodeDesc expr : rsop.getConf().getKeyCols()) {
+ assert expr instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr;
+ String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+ LOG.debug("key column: " + outputName);
+ thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn()));
+ String[] names = outputName.split("\\.");
+ String outputKeyName = "";
+ switch (names.length) {
+ case 1:
+ outputKeyName = names[0];
+ break;
+ case 2:
+ outputKeyName = names[1];
+ break;
+ default:
+ throw (new SemanticException("found a un-sopported internal key name structure"));
+ }
+ outputKeyNamesInDispatch.add(outputKeyName);
+ }
+
+ if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(
+ childOpIndex)) {
+ correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex,
+ new ArrayList<SelectDesc>());
+ }
+ correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex).
+ add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false));
+
+ ArrayList<ExprNodeDesc> thisValueColsInDispatch = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> outputValueNamesInDispatch = new ArrayList<String>();
+ for (ExprNodeDesc expr : rsop.getConf().getValueCols()) {
+
+ String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+ LOG.debug("value column: " + outputName);
+ LOG.debug("originalOpColumnExprMap.get(rsop):" + originalOpColumnExprMap.get(rsop) +
+ " expr:" + expr.toString() +
+ " orginalRS.getColumnInfos().toString:" + orginalRS.getColumnInfos().toString() + " "
+ + outputName);
+ ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName));
+ if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) {
+
+ String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size());
+ valueOutputColumnNames.add(col);
+ ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+ .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+ colExprMap.put(newColInfo.getInternalName(), expr);
+ outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+ valueCols.add(expr);
+
+ valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(
+ cinfo.getType(), col, tableName,
+ false));
+ }
+
+ thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString()));
+ String[] names = outputName.split("\\.");
+ String outputValueName = "";
+ switch (names.length) {
+ case 1:
+ outputValueName = names[0];
+ break;
+ case 2:
+ outputValueName = names[1];
+ break;
+ default:
+ throw (new SemanticException("found a un-sopported internal value name structure"));
+ }
+ outputValueNamesInDispatch.add(outputValueName);
+ }
+
+ if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(
+ childOpIndex)) {
+ correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex,
+ new ArrayList<SelectDesc>());
+ }
+ correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex).
+ add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false));
+ }
+
+ ReduceSinkOperator rsOp = null;
+ rsOp = (ReduceSinkOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols,
+ keyCols.size(), valueCols, new ArrayList<List<Integer>>(),
+ keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(),
+ -1), new RowSchema(outputRS
+ .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx());
+ rsOp.setColumnExprMap(colExprMap);
+ ((CorrelationCompositeOperator) ycop).getConf().setCorrespondingReduceSinkOperator(rsOp);
+
+ return rsOp;
+ }
+
+
+ /**
+ * Generate reduce sink descriptor.
+ *
+ * @param keyCols
+ * The columns to be stored in the key
+ * @param numKeys
+ * number of distribution keys. Equals to group-by-key
+ * numbers usually.
+ * @param valueCols
+ * The columns to be stored in the value
+ * @param distinctColIndices
+ * column indices for distinct aggregates
+ * @param outputKeyColumnNames
+ * The output key columns names
+ * @param outputValueColumnNames
+ * The output value columns names
+ * @param tag
+ * The tag for this ReduceSinkOperator
+ * @param numPartitionFields
+ * The first numPartitionFields of keyCols will be partition columns.
+ * If numPartitionFields=-1, then partition randomly.
+ * @param numReducers
+ * The number of reducers, set to -1 for automatic inference based on
+ * input data size.
+ * @return ReduceSinkDesc.
+ */
+ public static ReduceSinkDesc getReduceSinkDesc(
+ ArrayList<ExprNodeDesc> keyCols, int numKeys,
+ ArrayList<ExprNodeDesc> valueCols,
+ List<List<Integer>> distinctColIndices,
+ ArrayList<String> outputKeyColumnNames, ArrayList<String> outputValueColumnNames,
+ boolean includeKey, int tag,
+ int numPartitionFields, int numReducers) throws SemanticException {
+ ArrayList<ExprNodeDesc> partitionCols = null;
+
+ if (numPartitionFields >= keyCols.size()) {
+ partitionCols = keyCols;
+ } else if (numPartitionFields >= 0) {
+ partitionCols = new ArrayList<ExprNodeDesc>(numPartitionFields);
+ for (int i = 0; i < numPartitionFields; i++) {
+ partitionCols.add(keyCols.get(i));
+ }
+ } else {
+ // numPartitionFields = -1 means random partitioning
+ partitionCols = new ArrayList<ExprNodeDesc>(1);
+ partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor
+ .getFuncExprNodeDesc("rand"));
+ }
+
+ StringBuilder order = new StringBuilder();
+ for (int i = 0; i < keyCols.size(); i++) {
+ order.append("+");
+ }
+
+ TableDesc keyTable = null;
+ TableDesc valueTable = null;
+ ArrayList<String> outputKeyCols = new ArrayList<String>();
+ ArrayList<String> outputValCols = new ArrayList<String>();
+ if (includeKey) {
+ keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength(
+ keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""),
+ order.toString());
+ outputKeyCols.addAll(outputKeyColumnNames);
+ } else {
+ keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ keyCols, "reducesinkkey"), order.toString());
+ for (int i = 0; i < keyCols.size(); i++) {
+ outputKeyCols.add("reducesinkkey" + i);
+ }
+ }
+ valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ valueCols, outputValueColumnNames, 0, ""));
+ outputValCols.addAll(outputValueColumnNames);
+
+ return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
+ distinctColIndices, outputValCols,
+ tag, partitionCols, numReducers, keyTable,
+ valueTable, true);
+ }
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sun Sep 30 20:41:01 2012
@@ -116,6 +116,11 @@ public final class GenMapRedUtils {
}
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
+ }
+ if (op.getConf().getNeedsOperationPathTagging()) {
+ plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(true);
}
assert currTopOp != null;
@@ -182,6 +187,7 @@ public final class GenMapRedUtils {
opTaskMap.put(reducer, currTask);
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
}
ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
plan.setNumReduceTasks(desc.getNumReducers());
@@ -316,6 +322,7 @@ public final class GenMapRedUtils {
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
}
initUnionPlan(opProcCtx, unionTask, false);
@@ -1066,6 +1073,7 @@ public final class GenMapRedUtils {
// dependent on the redTask
if (reducer.getClass() == JoinOperator.class) {
cplan.setNeedsTagging(true);
+ cplan.setNeedsOperationPathTagging(false);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Sun Sep 30 20:41:01 2012
@@ -46,6 +46,18 @@ public class Optimizer {
*/
public void initialize(HiveConf hiveConf) {
transformations = new ArrayList<Transform>();
+ // Add correlation optimizer for first phase query plan tree analysis.
+ // The first phase will record original opColumnExprMap, opParseCtx, opRowResolver,
+ // since these may be changed by other optimizers (e.g. entries in opColumnExprMap may be deleted).
+ // If hive.groupby.skewindata is on, CorrelationOptimizer will not be applied.
+ // TODO: Make correlation optimizer 1 phase.
+ CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer();
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+ // TODO: make CorrelationOptimizer compatible with SkewJoinOptimizer
+ transformations.add(correlationOptimizer);
+ }
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
@@ -83,6 +95,13 @@ public class Optimizer {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
transformations.add(new GlobalLimitOptimizer());
}
+ // The second phase of correlation optimizer used for correlation detection and query plan tree transformation.
+ // The second phase should be the last optimizer added into transformations.
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+ transformations.add(correlationOptimizer);
+ }
transformations.add(new SimpleFetchOptimizer()); // must be called last
}