You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/01 07:46:55 UTC

svn commit: r1509082 [1/6] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ ql...

Author: gunther
Date: Thu Aug  1 05:46:53 2013
New Revision: 1509082

URL: http://svn.apache.org/r1509082
Log:
HIVE-4827: Merge a Map-only task to its child task (Yin Huai via Gunther Hagleitner)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin2.q
    hive/trunk/ql/src/test/results/clientpositive/multiMapJoin2.q.out
Removed:
    hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q
    hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q
    hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q
    hive/trunk/ql/src/test/queries/clientpositive/union34.q
    hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join10.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join11.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join12.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join15.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join16.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join2.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join20.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join21.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join22.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join23.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join24.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join26.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join28.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join29.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_join32.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_9.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer3.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer4.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer6.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer7.q.out
    hive/trunk/ql/src/test/results/clientpositive/join28.q.out
    hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hive/trunk/ql/src/test/results/clientpositive/join_star.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_filter_on_outerjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_subquery.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_subquery2.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_test_outer.q.out
    hive/trunk/ql/src/test/results/clientpositive/multiMapJoin1.q.out
    hive/trunk/ql/src/test/results/clientpositive/multi_join_union.q.out
    hive/trunk/ql/src/test/results/clientpositive/union34.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Aug  1 05:46:53 2013
@@ -507,7 +507,6 @@ public class HiveConf extends Configurat
     HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
     HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size",
         10000000L),
-    HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR("hive.optimize.mapjoin.mapreduce", false),
     HIVESKEWJOINKEY("hive.skewjoin.key", 100000),
     HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
     HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Aug  1 05:46:53 2013
@@ -860,16 +860,6 @@
 </property>
 
 <property>
-  <name>hive.optimize.mapjoin.mapreduce</name>
-  <value>false</value>
-  <description>If hive.auto.convert.join is off, this parameter does not take
-    affect. If it is on, and if there are map-join jobs followed by a map-reduce
-    job (for e.g a group by), each map-only job is merged with the following
-    map-reduce job.
-  </description>
-</property>
-
-<property>
   <name>hive.script.auto.progress</name>
   <value>false</value>
   <description>Whether Hive Tranform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity.  Hive sends progress information when the script is outputting to stderr.  This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker.  </description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Thu Aug  1 05:46:53 2013
@@ -1,9 +1,4 @@
 /**
- <<<<<<< HEAD
- =======
- * Copyright 2010 The Apache Software Foundation
- *
- >>>>>>> HIVE-1402 [jira] Add parallel ORDER BY to Hive
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,30 +25,30 @@ import java.util.Set;
 public class OperatorUtils {
 
   public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
-    return findOperator(start, clazz, new HashSet<T>());
+    return findOperators(start, clazz, new HashSet<T>());
   }
 
   public static <T> T findSingleOperator(Operator<?> start, Class<T> clazz) {
-    Set<T> found = findOperator(start, clazz, new HashSet<T>());
+    Set<T> found = findOperators(start, clazz, new HashSet<T>());
     return found.size() == 1 ? found.iterator().next() : null;
   }
 
   public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
     Set<T> found = new HashSet<T>();
     for (Operator<?> start : starts) {
-      findOperator(start, clazz, found);
+      findOperators(start, clazz, found);
     }
     return found;
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> Set<T> findOperator(Operator<?> start, Class<T> clazz, Set<T> found) {
+  private static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz, Set<T> found) {
     if (clazz.isInstance(start)) {
       found.add((T) start);
     }
     if (start.getChildOperators() != null) {
       for (Operator<?> child : start.getChildOperators()) {
-        findOperator(child, clazz, found);
+        findOperators(child, clazz, found);
       }
     }
     return found;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Aug  1 05:46:53 2013
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
@@ -62,7 +63,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -987,6 +987,77 @@ public final class GenMapRedUtils {
     return true;
   }
 
+
+
+  /**
+   * Replace the Map-side operator tree associated with targetAlias in
+   * target with the Map-side operator tree associated with sourceAlias in source.
+   * @param sourceAlias
+   * @param targetAlias
+   * @param source
+   * @param target
+   */
+  public static void replaceMapWork(String sourceAlias, String targetAlias,
+      MapWork source, MapWork target) {
+    Map<String, ArrayList<String>> sourcePathToAliases = source.getPathToAliases();
+    Map<String, PartitionDesc> sourcePathToPartitionInfo = source.getPathToPartitionInfo();
+    Map<String, Operator<? extends OperatorDesc>> sourceAliasToWork = source.getAliasToWork();
+    Map<String, PartitionDesc> sourceAliasToPartnInfo = source.getAliasToPartnInfo();
+
+    Map<String, ArrayList<String>> targetPathToAliases = target.getPathToAliases();
+    Map<String, PartitionDesc> targetPathToPartitionInfo = target.getPathToPartitionInfo();
+    Map<String, Operator<? extends OperatorDesc>> targetAliasToWork = target.getAliasToWork();
+    Map<String, PartitionDesc> targetAliasToPartnInfo = target.getAliasToPartnInfo();
+
+    if (!sourceAliasToWork.containsKey(sourceAlias) ||
+        !targetAliasToWork.containsKey(targetAlias)) {
+      // Nothing to do if there is no operator tree associated with
+      // sourceAlias in source or there is not operator tree associated
+      // with targetAlias in target.
+      return;
+    }
+
+    if (sourceAliasToWork.size() > 1) {
+      // If there are multiple aliases in source, we do not know
+      // how to merge.
+      return;
+    }
+
+    // Remove unnecessary information from target
+    targetAliasToWork.remove(targetAlias);
+    targetAliasToPartnInfo.remove(targetAlias);
+    List<String> pathsToRemove = new ArrayList<String>();
+    for (Entry<String, ArrayList<String>> entry: targetPathToAliases.entrySet()) {
+      ArrayList<String> aliases = entry.getValue();
+      aliases.remove(targetAlias);
+      if (aliases.isEmpty()) {
+        pathsToRemove.add(entry.getKey());
+      }
+    }
+    for (String pathToRemove: pathsToRemove) {
+      targetPathToAliases.remove(pathToRemove);
+      targetPathToPartitionInfo.remove(pathToRemove);
+    }
+
+    // Add new information from source to target
+    targetAliasToWork.put(sourceAlias, sourceAliasToWork.get(sourceAlias));
+    targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo);
+    targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo);
+    List<String> pathsToAdd = new ArrayList<String>();
+    for (Entry<String, ArrayList<String>> entry: sourcePathToAliases.entrySet()) {
+      ArrayList<String> aliases = entry.getValue();
+      if (aliases.contains(sourceAlias)) {
+        pathsToAdd.add(entry.getKey());
+      }
+    }
+    for (String pathToAdd: pathsToAdd) {
+      if (!targetPathToAliases.containsKey(pathToAdd)) {
+        targetPathToAliases.put(pathToAdd, new ArrayList<String>());
+      }
+      targetPathToAliases.get(pathToAdd).add(sourceAlias);
+    }
+  }
+
   private GenMapRedUtils() {
     // prevent instantiation
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Thu Aug  1 05:46:53 2013
@@ -55,8 +55,8 @@ public class QueryPlanTreeTransformation
           throws SemanticException {
     int newTag = bottomRSToNewTag.get(rsop);
     int oldTag = rsop.getConf().getTag();
-    // if this child of dispatcher does not use tag, we just set the oldTag to 0;
     if (oldTag == -1) {
+      // if this child of DemuxOperator does not use tag, we just set the oldTag to 0.
       oldTag = 0;
     }
     Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
@@ -68,7 +68,8 @@ public class QueryPlanTreeTransformation
     rsop.getConf().setTag(newTag);
   }
 
-  /** Based on the correlation, we transform the query plan tree (operator tree).
+  /**
+   * Based on the correlation, we transform the query plan tree (operator tree).
    * In here, we first create DemuxOperator and all bottom ReduceSinkOperators
    * (bottom means near TableScanOperaotr) in the correlation will be be
    * the parents of the DemuxOperaotr. We also reassign tags to those

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Thu Aug  1 05:46:53 2013
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Co
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -54,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 
 /*
@@ -109,182 +110,75 @@ public class CommonJoinTaskDispatcher ex
     super(context);
   }
 
-  // Get the position of the big table for this join operator and the given alias
-  private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
-      String alias) {
-    Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
-
-    // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
-    while ((parentOp.getChildOperators() != null) &&
-        (!parentOp.getChildOperators().isEmpty())) {
-      parentOp = parentOp.getChildOperators().get(0);
-    }
-
-    return joinOp.getParentOperators().indexOf(parentOp);
-  }
-
-  /*
-   * A task and its child task has been converted from join to mapjoin.
-   * See if the two tasks can be merged.
+  /**
+   * Calculate the total size of local tables in loclWork.
+   * @param localWork
+   * @return the total size of local tables. Or -1, if the total
+   * size is unknown.
    */
-  private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
-    MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
-    MapWork work = task.getWork().getMapWork();
-    MapredLocalWork localWork = work.getMapLocalWork();
-    MapWork childWork = childTask.getWork().getMapWork();
-    MapredLocalWork childLocalWork = childWork.getMapLocalWork();
-
-    // Can this be merged
-    Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
-    if (aliasToWork.size() > 1) {
-      return;
-    }
-
-    Operator<? extends OperatorDesc> op = aliasToWork.values().iterator().next();
-    while (op.getChildOperators() != null) {
-      // Dont perform this optimization for multi-table inserts
-      if (op.getChildOperators().size() > 1) {
-        return;
-      }
-      op = op.getChildOperators().get(0);
-    }
-
-    if (!(op instanceof FileSinkOperator)) {
-      return;
-    }
-
-    FileSinkOperator fop = (FileSinkOperator) op;
-    String workDir = fop.getConf().getDirName();
-
-    Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
-    if (childPathToAliases.size() > 1) {
-      return;
-    }
-
-    // The filesink writes to a different directory
-    if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
-      return;
-    }
-
-    // Either of them should not be bucketed
-    if ((localWork.getBucketMapjoinContext() != null) ||
-        (childLocalWork.getBucketMapjoinContext() != null)) {
-      return;
-    }
-
-    // Merge the trees
-    if (childWork.getAliasToWork().size() > 1) {
-      return;
-    }
-
-    long mapJoinSize = HiveConf.getLongVar(conf,
-        HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+  private long calculateLocalTableTotalSize(MapredLocalWork localWork) {
     long localTableTotalSize = 0;
-    for (String alias : localWork.getAliasToWork().keySet()) {
-      Long tabSize = aliasToSize.get(alias);
-      if (tabSize == null) {
-        /*
-         * if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
-         * this implies that merge cannot happen so we can return.
-         */
-        return;
-      }
-      localTableTotalSize += tabSize;
+    if (localWork == null) {
+      return localTableTotalSize;
     }
-
-    for (String alias : childLocalWork.getAliasToWork().keySet()) {
+    for (String alias : localWork.getAliasToWork().keySet()) {
       Long tabSize = aliasToSize.get(alias);
       if (tabSize == null) {
-        /*
-         * if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
-         * this implies that merge cannot happen so we can return.
-         */
-        return;
+        // if the size is unavailable, we need to assume a size 1 greater than
+        // localTableTotalSizeLimit this implies that merge cannot happen
+        // so we will return false.
+        return -1;
       }
       localTableTotalSize += tabSize;
-      if (localTableTotalSize > mapJoinSize) {
-        return;
-      }
-    }
-
-    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
-    // top of the second
-    Operator<? extends Serializable> childAliasOp =
-        childWork.getAliasToWork().values().iterator().next();
-    if (fop.getParentOperators().size() > 1) {
-      return;
     }
-    Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
-    // remove the unnecessary TableScan
-    if (childAliasOp instanceof TableScanOperator) {
-      TableScanOperator tso = (TableScanOperator)childAliasOp;
-      if (tso.getNumChild() != 1) {
-        // shouldn't happen
-        return;
-      }
-      childAliasOp = tso.getChildOperators().get(0);
-      childAliasOp.replaceParent(tso, parentFOp);
-    } else {
-      childAliasOp.setParentOperators(Utilities.makeList(parentFOp));
-    }
-    parentFOp.replaceChild(fop, childAliasOp);
+    return localTableTotalSize;
+  }
 
-    work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
-    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
-        .entrySet()) {
-      if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
-        work.getPathToPartitionInfo().put(childWorkEntry.getKey(), childWorkEntry.getValue());
+  /**
+   * Check if the total size of local tables will be under
+   * the limit after we merge localWork1 and localWork2.
+   * The limit of the total size of local tables is defined by
+   * HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD.
+   * @param conf
+   * @param localWorks
+   * @return
+   */
+  private boolean isLocalTableTotalSizeUnderLimitAfterMerge(
+      Configuration conf,
+      MapredLocalWork... localWorks) {
+    final long localTableTotalSizeLimit = HiveConf.getLongVar(conf,
+        HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+    long localTableTotalSize = 0;
+    for (int i = 0; i < localWorks.length; i++) {
+      final long localWorkTableTotalSize = calculateLocalTableTotalSize(localWorks[i]);
+      if (localWorkTableTotalSize < 0) {
+        // The total size of local tables in localWork[i] is unknown.
+        return false;
       }
+      localTableTotalSize += localWorkTableTotalSize;
     }
 
-    localWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
-    localWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
-
-    // remove the child task
-    List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
-    task.setChildTasks(oldChildTasks);
-    if (oldChildTasks != null) {
-      for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
-        oldChildTask.getParentTasks().remove(childTask);
-        oldChildTask.getParentTasks().add(task);
-      }
+    if (localTableTotalSize > localTableTotalSizeLimit) {
+      // The total size of local tables after we merge localWorks
+      // is larger than the limit set by
+      // HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD.
+      return false;
     }
 
-    boolean convertToSingleJob = HiveConf.getBoolVar(conf,
-        HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
-    if (convertToSingleJob) {
-      copyReducerConf(task, childTask);
-    }
+    return true;
   }
 
-  /**
-   * Copy reducer configuration if the childTask also has a reducer.
-   *
-   * @param task
-   * @param childTask
-   */
-  private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
-    MapredWork mrChildWork = childTask.getWork();
-    ReduceWork childWork = childTask.getWork().getReduceWork();
-    if (childWork == null) {
-      return;
-    }
+  // Get the position of the big table for this join operator and the given alias
+  private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
+      String alias) {
+    Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
 
-    Operator childReducer = childWork.getReducer();
-    MapredWork work = task.getWork();
-    if (childReducer == null) {
-      return;
+    // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
+    while ((parentOp.getChildOperators() != null) &&
+        (!parentOp.getChildOperators().isEmpty())) {
+      parentOp = parentOp.getChildOperators().get(0);
     }
-    ReduceWork rWork = new ReduceWork();
-    work.setReduceWork(rWork);
-    rWork.setReducer(childReducer);
-    rWork.setNumReduceTasks(childWork.getNumReduceTasks());
-    work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
-    rWork.setNeedsTagging(childWork.getNeedsTagging());
-
-    // Make sure the key configuration is correct, clear and regenerate.
-    rWork.getTagToValueDesc().clear();
-    GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
+    return joinOp.getParentOperators().indexOf(parentOp);
   }
 
   // create map join task and set big table as bigTablePosition
@@ -305,129 +199,165 @@ public class CommonJoinTaskDispatcher ex
    * A task and its child task has been converted from join to mapjoin.
    * See if the two tasks can be merged.
    */
-  private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configuration conf) {
+  private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Configuration conf)
+      throws SemanticException{
+    // Step 1: Check if mapJoinTask has a single child.
+    // If so, check if we can merge mapJoinTask into that child.
     if (mapJoinTask.getChildTasks() == null
         || mapJoinTask.getChildTasks().size() > 1) {
       // No child-task to merge, nothing to do or there are more than one
       // child-tasks in which case we don't want to do anything.
       return;
     }
-    Task<? extends Serializable> firstChildTask = mapJoinTask.getChildTasks().get(0);
-    if (!(firstChildTask instanceof MapRedTask)) {
-      // Nothing to do if it is not a mapreduce task.
-      return;
-    }
-    MapRedTask childTask = (MapRedTask) firstChildTask;
-    MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
-    MapredWork childWork = childTask.getWork();
-    if (childWork.getReduceWork() == null) {
-      // Not a MR job, nothing to merge.
-      return;
-    }
 
-    // Can this be merged
-    Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapJoinWork.getAliasToWork();
-    if (aliasToWork.size() > 1) {
-      return;
-    }
-    Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
-    if (childPathToAliases.size() > 1) {
+    Task<? extends Serializable> childTask = mapJoinTask.getChildTasks().get(0);
+    if (!(childTask instanceof MapRedTask)) {
+      // Nothing to do if it is not a MapReduce task.
       return;
     }
 
-    // Locate leaf operator of the map-join task. Start by initializing leaf
-    // operator to be root operator.
-    Operator<? extends OperatorDesc> mapJoinLeafOperator = aliasToWork.values().iterator().next();
-    while (mapJoinLeafOperator.getChildOperators() != null) {
-      // Dont perform this optimization for multi-table inserts
-      if (mapJoinLeafOperator.getChildOperators().size() > 1) {
-        return;
-      }
-      mapJoinLeafOperator = mapJoinLeafOperator.getChildOperators().get(0);
-    }
+    MapRedTask childMapRedTask = (MapRedTask) childTask;
+    MapWork mapJoinMapWork = mapJoinTask.getWork().getMapWork();
+    MapWork childMapWork = childMapRedTask.getWork().getMapWork();
 
-    assert (mapJoinLeafOperator instanceof FileSinkOperator);
-    if (!(mapJoinLeafOperator instanceof FileSinkOperator)) {
-      // Sanity check, shouldn't happen.
+    Map<String, Operator<? extends OperatorDesc>> mapJoinAliasToWork =
+        mapJoinMapWork.getAliasToWork();
+    if (mapJoinAliasToWork.size() > 1) {
+      // Do not merge if the MapredWork of MapJoin has multiple input aliases.
       return;
     }
 
-    FileSinkOperator mapJoinTaskFileSinkOperator = (FileSinkOperator) mapJoinLeafOperator;
+    Entry<String, Operator<? extends OperatorDesc>> mapJoinAliasToWorkEntry =
+        mapJoinAliasToWork.entrySet().iterator().next();
+    String mapJoinAlias = mapJoinAliasToWorkEntry.getKey();
+    TableScanOperator mapJoinTaskTableScanOperator =
+        OperatorUtils.findSingleOperator(
+            mapJoinAliasToWorkEntry.getValue(), TableScanOperator.class);
+    if (mapJoinTaskTableScanOperator == null) {
+      throw new SemanticException("Expected a " + TableScanOperator.getOperatorName() +
+          " operator as the work associated with alias " + mapJoinAlias +
+          ". Found a " + mapJoinAliasToWork.get(mapJoinAlias).getName() + " operator.");
+    }
+    FileSinkOperator mapJoinTaskFileSinkOperator =
+        OperatorUtils.findSingleOperator(
+            mapJoinTaskTableScanOperator, FileSinkOperator.class);
+    if (mapJoinTaskFileSinkOperator == null) {
+      throw new SemanticException("Cannot find the " + FileSinkOperator.getOperatorName() +
+          " operator at the last operator of the MapJoin Task.");
+    }
 
-    // The filesink writes to a different directory
-    String workDir = mapJoinTaskFileSinkOperator.getConf().getDirName();
-    if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
+    // The mapJoinTaskFileSinkOperator writes to a different directory
+    String childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName();
+    List<String> childMRAliases = childMapWork.getPathToAliases().get(childMRPath);
+    if (childMRAliases == null || childMRAliases.size() != 1) {
       return;
     }
+    String childMRAlias = childMRAliases.get(0);
 
-    MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
-    MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
+    MapredLocalWork mapJoinLocalWork = mapJoinMapWork.getMapLocalWork();
+    MapredLocalWork childLocalWork = childMapWork.getMapLocalWork();
 
-    // Either of them should not be bucketed
     if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
         (childLocalWork != null && childLocalWork.getBucketMapjoinContext() != null)) {
+      // Right now, we do not handle the case that either of them is bucketed.
+      // We should relax this constraint with a follow-up jira.
       return;
     }
 
-    if (childWork.getMapWork().getAliasToWork().size() > 1) {
-      return;
-    }
-
-    Operator<? extends Serializable> childAliasOp =
-        childWork.getMapWork().getAliasToWork().values().iterator().next();
-    if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
-      return;
-    }
-
-    // remove the unnecessary TableScan
-    if (childAliasOp instanceof TableScanOperator) {
-      TableScanOperator tso = (TableScanOperator)childAliasOp;
-      if (tso.getNumChild() != 1) {
-        // shouldn't happen
-        return;
-      }
-      childAliasOp = tso.getChildOperators().get(0);
-      childAliasOp.getParentOperators().remove(tso);
-    }
-
-    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
-    // top of the second
-    Operator<? extends Serializable> parentFOp = mapJoinTaskFileSinkOperator
-        .getParentOperators().get(0);
-    parentFOp.getChildOperators().remove(mapJoinTaskFileSinkOperator);
-    parentFOp.getChildOperators().add(childAliasOp);
-    List<Operator<? extends OperatorDesc>> parentOps =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-    parentOps.add(parentFOp);
-    childAliasOp.setParentOperators(parentOps);
-
-    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
-    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
-        .entrySet()) {
-      if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
-        mapJoinWork.getPathToPartitionInfo()
-            .put(childWorkEntry.getKey(), childWorkEntry.getValue());
+    // We need to check if the total size of local tables is under the limit.
+    // At here, we are using a strong condition, which is the total size of
+    // local tables used by all input paths. Actually, we can relax this condition
+    // to check the total size of local tables for every input path.
+    // Example:
+    //               UNION_ALL
+    //              /         \
+    //             /           \
+    //            /             \
+    //           /               \
+    //       MapJoin1          MapJoin2
+    //      /   |   \         /   |   \
+    //     /    |    \       /    |    \
+    //   Big1   S1   S2    Big2   S3   S4
+    // In this case, we have two MapJoins, MapJoin1 and MapJoin2. Big1 and Big2 are two
+    // big tables, and S1, S2, S3, and S4 are four small tables. Hash tables of S1 and S2
+    // will only be used by Map tasks processing Big1. Hash tables of S3 and S4 will only
+    // be used by Map tasks processing Big2. If Big1!=Big2, we should only check if the size
+    // of S1 + S2 is under the limit, and if the size of S3 + S4 is under the limit.
+    // But, right now, we are checking the size of S1 + S2 + S3 + S4 is under the limit.
+    // If Big1=Big2, we will only scan a path once. So, MapJoin1 and MapJoin2 will be executed
+    // in the same Map task. In this case, we need to make sure the size of S1 + S2 + S3 + S4
+    // is under the limit.
+    if (!isLocalTableTotalSizeUnderLimitAfterMerge(conf, mapJoinLocalWork, childLocalWork)){
+      // The total size of local tables may not be under
+      // the limit after we merge mapJoinLocalWork and childLocalWork.
+      // Do not merge.
+      return;
+    }
+
+    TableScanOperator childMRTaskTableScanOperator =
+        OperatorUtils.findSingleOperator(
+            childMapWork.getAliasToWork().get(childMRAlias), TableScanOperator.class);
+    if (childMRTaskTableScanOperator == null) {
+      throw new SemanticException("Expected a " + TableScanOperator.getOperatorName() +
+          " operator as the work associated with alias " + childMRAlias +
+          ". Found a " + childMapWork.getAliasToWork().get(childMRAlias).getName() + " operator.");
+    }
+
+    List<Operator<? extends OperatorDesc>> parentsInMapJoinTask =
+        mapJoinTaskFileSinkOperator.getParentOperators();
+    List<Operator<? extends OperatorDesc>> childrenInChildMRTask =
+        childMRTaskTableScanOperator.getChildOperators();
+    if (parentsInMapJoinTask.size() > 1 || childrenInChildMRTask.size() > 1) {
+      // Do not merge if we do not know how to connect two operator trees.
+      return;
+    }
+
+    // Step 2: Merge mapJoinTask into the Map-side of its child.
+    // Step 2.1: Connect the operator trees of two MapRedTasks.
+    Operator<? extends OperatorDesc> parentInMapJoinTask = parentsInMapJoinTask.get(0);
+    Operator<? extends OperatorDesc> childInChildMRTask = childrenInChildMRTask.get(0);
+    parentInMapJoinTask.replaceChild(mapJoinTaskFileSinkOperator, childInChildMRTask);
+    childInChildMRTask.replaceParent(childMRTaskTableScanOperator, parentInMapJoinTask);
+
+    // Step 2.2: Replace the corresponding part childMRWork's MapWork.
+    GenMapRedUtils.replaceMapWork(mapJoinAlias, childMRAlias, mapJoinMapWork, childMapWork);
+
+    // Step 2.3: Fill up stuff in local work
+    if (mapJoinLocalWork != null) {
+      if (childLocalWork == null) {
+        childMapWork.setMapLocalWork(mapJoinLocalWork);
+      } else {
+        childLocalWork.getAliasToFetchWork().putAll(mapJoinLocalWork.getAliasToFetchWork());
+        childLocalWork.getAliasToWork().putAll(mapJoinLocalWork.getAliasToWork());
+      }
+    }
+
+    // Step 2.4: Remove this MapJoin task
+    List<Task<? extends Serializable>> parentTasks = mapJoinTask.getParentTasks();
+    mapJoinTask.setParentTasks(null);
+    mapJoinTask.setChildTasks(null);
+    childMapRedTask.getParentTasks().remove(mapJoinTask);
+    if (parentTasks != null) {
+      childMapRedTask.getParentTasks().addAll(parentTasks);
+      for (Task<? extends Serializable> parentTask : parentTasks) {
+        parentTask.getChildTasks().remove(mapJoinTask);
+        if (!parentTask.getChildTasks().contains(childMapRedTask)) {
+          parentTask.getChildTasks().add(childMapRedTask);
+        }
       }
-    }
-
-    // Fill up stuff in local work
-    if (mapJoinLocalWork != null && childLocalWork != null) {
-      mapJoinLocalWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
-      mapJoinLocalWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
-    }
-
-    // remove the child task
-    List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
-    mapJoinTask.setChildTasks(oldChildTasks);
-    if (oldChildTasks != null) {
-      for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
-        oldChildTask.getParentTasks().remove(childTask);
-        oldChildTask.getParentTasks().add(mapJoinTask);
+    } else {
+      if (physicalContext.getRootTasks().contains(mapJoinTask)) {
+        physicalContext.removeFromRootTask(mapJoinTask);
+        if (childMapRedTask.getParentTasks() != null &&
+            childMapRedTask.getParentTasks().size() == 0 &&
+            !physicalContext.getRootTasks().contains(childMapRedTask)) {
+          physicalContext.addToRootTask(childMapRedTask);
+        }
       }
     }
-
-    // Copy the reducer conf.
-    copyReducerConf(mapJoinTask, childTask);
+    if (childMapRedTask.getParentTasks().size() == 0) {
+      childMapRedTask.setParentTasks(null);
+    }
   }
 
   public static boolean cannotConvert(String bigTableAlias,
@@ -557,20 +487,7 @@ public class CommonJoinTaskDispatcher ex
         // Can this task be merged with the child task. This can happen if a big table is being
         // joined with multiple small tables on different keys
         if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)) {
-          if (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP) {
-            // Merging two map-join tasks
-            mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
-          }
-
-          // Converted the join operator into a map-join. Now see if it can
-          // be merged into the following map-reduce job.
-          boolean convertToSingleJob = HiveConf.getBoolVar(conf,
-              HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
-          if (convertToSingleJob) {
-            // Try merging a map-join task with a mapreduce job to have a
-            // single job.
-            mergeMapJoinTaskWithMapReduceTask(newTask, conf);
-          }
+          mergeMapJoinTaskIntoItsChildMapRedTask(newTask, conf);
         }
 
         return newTask;

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q Thu Aug  1 05:46:53 2013
@@ -33,23 +33,6 @@ set hive.optimize.correlation=true;
 -- Enable hive.auto.convert.join.
 -- Correlation Optimizer will detect that the join will be converted to a Map-join,
 -- so it will not try to optimize this query.
-EXPLAIN
-SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
-FROM (SELECT x.key AS key, count(1) AS cnt
-      FROM src1 x JOIN src y ON (x.key = y.key)
-      GROUP BY x.key) tmp;
-
-SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
-FROM (SELECT x.key AS key, count(1) AS cnt
-      FROM src1 x JOIN src y ON (x.key = y.key)
-      GROUP BY x.key) tmp;
-      
-set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
-set hive.optimize.correlation=true;
--- Enable hive.auto.convert.join.
--- Correlation Optimizer will detect that the join will be converted to a Map-join,
--- so it will not try to optimize this query.
 -- We should generate 1 MR job for subquery tmp.
 EXPLAIN
 SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q Thu Aug  1 05:46:53 2013
@@ -36,7 +36,6 @@ FROM (SELECT b.key AS key, b.cnt AS cnt,
 
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 -- Enable hive.auto.convert.join.
 EXPLAIN
 SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
@@ -79,10 +78,9 @@ FROM (SELECT d.key AS key, d.cnt AS cnt,
       FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
       JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
       ON b.key = d.key) tmp;
-      
+
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 -- Enable hive.auto.convert.join.
 EXPLAIN
 SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q Thu Aug  1 05:46:53 2013
@@ -33,10 +33,9 @@ SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.
 FROM (SELECT y.key AS key, count(1) AS cnt
       FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
       GROUP BY y.key) tmp;
-      
+
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 -- Enable hive.auto.convert.join.
 EXPLAIN
 SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q Thu Aug  1 05:46:53 2013
@@ -52,7 +52,6 @@ ON b.key = d.key;
 
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 set hive.auto.convert.join.noconditionaltask.size=10000000000;
 -- Enable hive.auto.convert.join.
 EXPLAIN

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q Thu Aug  1 05:46:53 2013
@@ -36,7 +36,6 @@ ON xx.key=yy.key ORDER BY xx.key, xx.cnt
 
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 -- Enable hive.auto.convert.join.
 EXPLAIN
 SELECT xx.key, xx.cnt, yy.key, yy.cnt
@@ -306,7 +305,6 @@ ON xx.key=yy.key ORDER BY xx.key, xx.cnt
 
 set hive.optimize.correlation=true;
 set hive.auto.convert.join=true;
-set hive.optimize.mapjoin.mapreduce=true;
 EXPLAIN
 SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
 FROM

Modified: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q Thu Aug  1 05:46:53 2013
@@ -36,19 +36,14 @@ ON xx.key=yy.key ORDER BY xx.key, xx.cnt
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
 set hive.auto.convert.join.noconditionaltask.size=10000000000;
-set hive.optimize.mapjoin.mapreduce=true;
 
 set hive.optimize.correlation=false;
 -- Without correlation optimizer, we will have 3 MR jobs.
 -- The first one is a MapJoin and Aggregation (in the Reduce Phase).
 -- The second one is another MapJoin. The third one is for ordering.
--- With the correlation optimizer, right now, we still have
--- 3 MR jobs. The first one is a MapJoin and the map-side aggregation (a map-only job).
--- The second one have the reduce-side aggregation and the second join.
--- The third one is for ordering.
--- Although we have turned on hive.optimize.mapjoin.mapreduce, that optimizer
--- can not handle the case that the MR job (the one which a map-only job will be merged in)
--- has multiple inputs. We should improve that optimizer.
+-- With the correlation optimizer, right now, we have
+-- 2 MR jobs. The first one will evaluate the sub-query xx and the join of
+-- xx and yy. The second one will do the ORDER BY.
 EXPLAIN
 SELECT xx.key, xx.cnt, yy.key, yy.value
 FROM (SELECT x.key AS key, count(1) AS cnt

Modified: hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q Thu Aug  1 05:46:53 2013
@@ -1,10 +1,15 @@
--- Join of a big table with 2 small tables on different keys should be performed as a single MR job
 create table smallTbl1(key string, value string);
 insert overwrite table smallTbl1 select * from src where key < 10;
 
 create table smallTbl2(key string, value string);
 insert overwrite table smallTbl2 select * from src where key < 10;
 
+create table smallTbl3(key string, value string);
+insert overwrite table smallTbl3 select * from src where key < 10;
+
+create table smallTbl4(key string, value string);
+insert overwrite table smallTbl4 select * from src where key < 10;
+
 create table bigTbl(key string, value string);
 insert overwrite table bigTbl
 select * from
@@ -68,37 +73,30 @@ select count(*) FROM
  bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 
  on (bigTbl.key = smallTbl1.key)
 ) firstjoin
-JOIN                                                                  
+JOIN
 smallTbl2 on (firstjoin.value1 = smallTbl2.value);
 
-set hive.optimize.mapjoin.mapreduce=true;
-
 -- Now run a query with two-way join, which should first be converted into a
 -- map-join followed by groupby and then finally into a single MR job.
 
-explain insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
+explain
 select count(*) FROM
 (select bigTbl.key as key, bigTbl.value as value1,
- bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
  on (bigTbl.key = smallTbl1.key)
 ) firstjoin
-JOIN                                                                  
+JOIN
 smallTbl2 on (firstjoin.value1 = smallTbl2.value)
 group by smallTbl2.key;
 
-insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
 select count(*) FROM
 (select bigTbl.key as key, bigTbl.value as value1,
- bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
  on (bigTbl.key = smallTbl1.key)
 ) firstjoin
-JOIN                                                                  
+JOIN
 smallTbl2 on (firstjoin.value1 = smallTbl2.value)
 group by smallTbl2.key;
-set hive.optimize.mapjoin.mapreduce=false;
-
-create table smallTbl3(key string, value string);
-insert overwrite table smallTbl3 select * from src where key < 10;
 
 drop table bigTbl;
 
@@ -128,100 +126,276 @@ select * from
 ) subq;
 
 set hive.auto.convert.join.noconditionaltask=false;
-
-explain
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
-
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+-- First disable noconditionaltask
+EXPLAIN
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
 
 set hive.auto.convert.join.noconditionaltask=true;
 set hive.auto.convert.join.noconditionaltask.size=10000;
-
--- join with 4 tables on different keys is also executed as a single MR job,
--- So, overall two jobs - one for multi-way join and one for count(*)
-explain
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
-
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
-
-set hive.optimize.mapjoin.mapreduce=true;
--- Now run the above query with M-MR optimization
--- This should be a single MR job end-to-end.
-explain
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
-
-select count(*) FROM
- (
-   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
-          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
-    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
-            bigTbl.value as value1, bigTbl.value as value2 
-     FROM bigTbl JOIN smallTbl1 
-     on (bigTbl.key1 = smallTbl1.key)
-    ) firstjoin
-    JOIN                                                                  
-    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
- ) secondjoin
- JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
-
-set hive.optimize.mapjoin.mapreduce=false;
+-- Enable noconditionaltask and set the size of hive.auto.convert.join.noconditionaltask.size
+-- to 10000, which is large enough to fit all four small tables (smallTbl1 to smallTbl4).
+-- We will use a single MR job to evaluate this query.
+EXPLAIN
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+ 
+set hive.auto.convert.join.noconditionaltask.size=200;
+-- Enable noconditionaltask and set the size of hive.auto.convert.join.noconditionaltask.size
+-- to 200, which is large enough to fit two small tables. We will have two jobs to evaluate this
+-- query. The first job is a Map-only job to evaluate join1 and join2.
+-- The second job will evaluate the rest of this query.
+EXPLAIN
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+
+set hive.auto.convert.join.noconditionaltask.size=0;
+-- Enable noconditionaltask and but set the size of hive.auto.convert.join.noconditionaltask.size
+-- to 0. The plan will be the same as the one with a disabled nonconditionaltask.
+EXPLAIN
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);
+
+SELECT SUM(HASH(join3.key1)),
+       SUM(HASH(join3.key2)),
+       SUM(HASH(join3.key3)),
+       SUM(HASH(join3.key4)),
+       SUM(HASH(join3.key5)),
+       SUM(HASH(smallTbl4.key)),
+       SUM(HASH(join3.value1)),
+       SUM(HASH(join3.value2))
+FROM (SELECT join2.key1 as key1,
+             join2.key2 as key2,
+             join2.key3 as key3,
+             join2.key4 as key4,
+             smallTbl3.key as key5,
+             join2.value1 as value1,
+             join2.value2 as value2
+      FROM (SELECT join1.key1 as key1,
+                   join1.key2 as key2,
+                   join1.key3 as key3,
+                   smallTbl2.key as key4,
+                   join1.value1 as value1,
+                   join1.value2 as value2
+            FROM (SELECT bigTbl.key1 as key1,
+                         bigTbl.key2 as key2,
+                         smallTbl1.key as key3,
+                         bigTbl.value as value1,
+                         bigTbl.value as value2
+                  FROM bigTbl
+                  JOIN smallTbl1 ON (bigTbl.key1 = smallTbl1.key)) join1
+            JOIN smallTbl2 ON (join1.value1 = smallTbl2.value)) join2
+      JOIN smallTbl3 ON (join2.key2 = smallTbl3.key)) join3
+JOIN smallTbl4 ON (join3.key3 = smallTbl4.key);

Added: hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin2.q?rev=1509082&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin2.q Thu Aug  1 05:46:53 2013
@@ -0,0 +1,189 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=6000;
+
+-- we will generate one MR job.
+EXPLAIN
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+set hive.auto.convert.join.noconditionaltask.size=400;
+-- Check if the total size of local tables will be
+-- larger than the limit that
+-- we set through hive.auto.convert.join.noconditionaltask.size (right now, it is
+-- 400 bytes). If so, do not merge.
+-- For this query, we will merge the MapJoin of x2 and y2 into the MR job
+-- for UNION ALL and ORDER BY. But, the MapJoin of x1 and y2 will not be merged
+-- into that MR job.
+EXPLAIN
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+set hive.auto.convert.join.noconditionaltask.size=6000;
+-- We will use two jobs.
+-- We will generate one MR job for GROUP BY
+-- on x1, one MR job for both the MapJoin of x2 and y2, the UNION ALL, and the
+-- ORDER BY.
+EXPLAIN
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src1 x1 GROUP BY x1.key
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+SELECT tmp.key
+FROM (SELECT x1.key AS key FROM src1 x1 GROUP BY x1.key
+      UNION ALL
+      SELECT x2.key AS key FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)) tmp
+ORDER BY tmp.key;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is disabled,
+-- we will use 5 jobs.
+-- We will generate one MR job to evaluate the sub-query tmp1,
+-- one MR job to evaluate the sub-query tmp2,
+-- one MR job for the Join of tmp1 and tmp2,
+-- one MR job for aggregation on the result of the Join of tmp1 and tmp2,
+-- and one MR job for the ORDER BY.
+EXPLAIN
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+set hive.optimize.correlation=true;
+-- When Correlation Optimizer is enabled,
+-- we will use two jobs. This first MR job will evaluate sub-queries of tmp1, tmp2,
+-- the Join of tmp1 and tmp2, and the aggregation on the result of the Join of
+-- tmp1 and tmp2. The second job will do the ORDER BY.
+EXPLAIN
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src x1 JOIN src1 y1 ON (x1.key = y1.key)
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is disabled,
+-- we will use five jobs.
+-- We will generate one MR job to evaluate the sub-query tmp1,
+-- one MR job to evaluate the sub-query tmp2,
+-- one MR job for the Join of tmp1 and tmp2,
+-- one MR job for aggregation on the result of the Join of tmp1 and tmp2,
+-- and one MR job for the ORDER BY.
+EXPLAIN
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src1 x1
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src1 x1
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+set hive.optimize.correlation=true;
+-- When Correlation Optimizer is enabled,
+-- we will use two job. This first MR job will evaluate sub-queries of tmp1, tmp2,
+-- the Join of tmp1 and tmp2, and the aggregation on the result of the Join of
+-- tmp1 and tmp2. The second job will do the ORDER BY.
+EXPLAIN
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src1 x1
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+SELECT tmp1.key as key, count(*) as cnt
+FROM (SELECT x1.key AS key
+      FROM src1 x1
+      GROUP BY x1.key) tmp1
+JOIN (SELECT x2.key AS key
+      FROM src x2 JOIN src1 y2 ON (x2.key = y2.key)
+      GROUP BY x2.key) tmp2
+ON (tmp1.key = tmp2.key)
+GROUP BY tmp1.key
+ORDER BY key, cnt;
+
+-- Check if we can correctly handle partitioned table.
+CREATE TABLE part_table(key string, value string) PARTITIONED BY (partitionId int);
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=1)
+  SELECT key, value FROM src ORDER BY key, value LIMIT 100;
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=2)
+  SELECT key, value FROM src1 ORDER BY key, value;
+
+EXPLAIN
+SELECT count(*)
+FROM part_table x JOIN src1 y ON (x.key = y.key);
+
+SELECT count(*)
+FROM part_table x JOIN src1 y ON (x.key = y.key);
+

Modified: hive/trunk/ql/src/test/queries/clientpositive/union34.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/union34.q?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/union34.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/union34.q Thu Aug  1 05:46:53 2013
@@ -1,11 +1,3 @@
--- HIVE-4342
--- Maponly union(UNION-13) is merged into non-maponly union(UNION-15)
--- In this case, task for UNION-13 should be removed from top-task and merged into task for UNION-15
--- TS[2]-SEL[3]-RS[5]-JOIN[6]-SEL[7]-UNION[15]-SEL[16]-RS[17]-EX[18]-FS[19]
--- TS[0]-SEL[1]-RS[4]-JOIN[6]
--- TS[8]-SEL[9]-UNION[13]-SEL[14]-UNION[15]
--- TS[11]-SEL[12]-UNION[13]
-
 create table src10_1 (key string, value string);
 create table src10_2 (key string, value string);
 create table src10_3 (key string, value string);
@@ -18,7 +10,8 @@ insert overwrite table src10_3 select *
 insert overwrite table src10_4 select *;
 
 set hive.auto.convert.join=true;
-
+-- When we convert the Join of sub1 and sub0 into a MapJoin,
+-- we can use a single MR job to evaluate this entire query.
 explain
 SELECT * FROM (
   SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)
@@ -33,7 +26,10 @@ SELECT * FROM (
 ) alias1 order by key;
 
 set hive.auto.convert.join=false;
-
+-- When we do not convert the Join of sub1 and sub0 into a MapJoin,
+-- we need to use two MR jobs to evaluate this query.
+-- The first job is for the Join of sub1 and sub2. The second job
+-- is for the UNION ALL and ORDER BY.
 explain
 SELECT * FROM (
   SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out Thu Aug  1 05:46:53 2013
@@ -25,8 +25,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-6 is a root stage
-  Stage-5 depends on stages: Stage-6
-  Stage-2 depends on stages: Stage-5
+  Stage-2 depends on stages: Stage-6
   Stage-3 depends on stages: Stage-2
   Stage-0 is a root stage
 
@@ -62,7 +61,7 @@ STAGE PLANS:
                     1 []
                   Position of Big Table: 0
 
-  Stage: Stage-5
+  Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
         a:src1:src 
@@ -102,40 +101,29 @@ STAGE PLANS:
                           expr: _col3
                           type: string
                     outputColumnNames: _col0, _col1, _col2, _col3
-                    File Output Operator
-                      compressed: false
-                      GlobalTableId: 0
-                      table:
-                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    Reduce Output Operator
+                      key expressions:
+                            expr: _col0
+                            type: string
+                            expr: _col1
+                            type: string
+                            expr: _col2
+                            type: string
+                            expr: _col3
+                            type: string
+                      sort order: ++++
+                      tag: -1
+                      value expressions:
+                            expr: _col0
+                            type: string
+                            expr: _col1
+                            type: string
+                            expr: _col2
+                            type: string
+                            expr: _col3
+                            type: string
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-                    expr: _col2
-                    type: string
-                    expr: _col3
-                    type: string
-              sort order: ++++
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-                    expr: _col2
-                    type: string
-                    expr: _col3
-                    type: string
       Reduce Operator Tree:
         Extract
           Select Operator

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join10.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join10.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join10.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join10.q.out Thu Aug  1 05:46:53 2013
@@ -19,8 +19,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-5 is a root stage
-  Stage-4 depends on stages: Stage-5
-  Stage-2 depends on stages: Stage-4
+  Stage-2 depends on stages: Stage-5
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -51,7 +50,7 @@ STAGE PLANS:
                   1 [Column[_col0]]
                 Position of Big Table: 0
 
-  Stage: Stage-4
+  Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
         x:src 
@@ -87,25 +86,14 @@ STAGE PLANS:
                     bucketGroup: false
                     mode: hash
                     outputColumnNames: _col0
-                    File Output Operator
-                      compressed: false
-                      GlobalTableId: 0
-                      table:
-                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    Reduce Output Operator
+                      sort order: 
+                      tag: -1
+                      value expressions:
+                            expr: _col0
+                            type: bigint
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              sort order: 
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: bigint
       Reduce Operator Tree:
         Group By Operator
           aggregations:

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join11.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join11.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join11.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join11.q.out Thu Aug  1 05:46:53 2013
@@ -19,8 +19,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-5 is a root stage
-  Stage-4 depends on stages: Stage-5
-  Stage-2 depends on stages: Stage-4
+  Stage-2 depends on stages: Stage-5
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -53,7 +52,7 @@ STAGE PLANS:
                     1 [Column[_col0]]
                   Position of Big Table: 1
 
-  Stage: Stage-4
+  Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
         src2:src 
@@ -95,25 +94,14 @@ STAGE PLANS:
                       bucketGroup: false
                       mode: hash
                       outputColumnNames: _col0
-                      File Output Operator
-                        compressed: false
-                        GlobalTableId: 0
-                        table:
-                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      Reduce Output Operator
+                        sort order: 
+                        tag: -1
+                        value expressions:
+                              expr: _col0
+                              type: bigint
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              sort order: 
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: bigint
       Reduce Operator Tree:
         Group By Operator
           aggregations:

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join12.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join12.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join12.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join12.q.out Thu Aug  1 05:46:53 2013
@@ -25,8 +25,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-6 is a root stage
-  Stage-5 depends on stages: Stage-6
-  Stage-2 depends on stages: Stage-5
+  Stage-2 depends on stages: Stage-6
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -87,7 +86,7 @@ STAGE PLANS:
                     2 [Column[_col0]]
                   Position of Big Table: 1
 
-  Stage: Stage-5
+  Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
         src2:src 
@@ -132,25 +131,14 @@ STAGE PLANS:
                       bucketGroup: false
                       mode: hash
                       outputColumnNames: _col0
-                      File Output Operator
-                        compressed: false
-                        GlobalTableId: 0
-                        table:
-                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      Reduce Output Operator
+                        sort order: 
+                        tag: -1
+                        value expressions:
+                              expr: _col0
+                              type: bigint
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              sort order: 
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: bigint
       Reduce Operator Tree:
         Group By Operator
           aggregations:

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out Thu Aug  1 05:46:53 2013
@@ -25,8 +25,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-8 is a root stage
-  Stage-7 depends on stages: Stage-8
-  Stage-3 depends on stages: Stage-7
+  Stage-3 depends on stages: Stage-8
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -83,7 +82,7 @@ STAGE PLANS:
                     1 [class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[_col0]()]
                   Position of Big Table: 0
 
-  Stage: Stage-7
+  Stage: Stage-3
     Map Reduce
       Alias -> Map Operator Tree:
         src2:src 
@@ -137,25 +136,14 @@ STAGE PLANS:
                         bucketGroup: false
                         mode: hash
                         outputColumnNames: _col0
-                        File Output Operator
-                          compressed: false
-                          GlobalTableId: 0
-                          table:
-                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        Reduce Output Operator
+                          sort order: 
+                          tag: -1
+                          value expressions:
+                                expr: _col0
+                                type: bigint
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-3
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              sort order: 
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: bigint
       Reduce Operator Tree:
         Group By Operator
           aggregations:

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join15.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join15.q.out?rev=1509082&r1=1509081&r2=1509082&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join15.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join15.q.out Thu Aug  1 05:46:53 2013
@@ -19,8 +19,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-6 is a root stage
-  Stage-5 depends on stages: Stage-6
-  Stage-2 depends on stages: Stage-5
+  Stage-2 depends on stages: Stage-6
   Stage-3 depends on stages: Stage-2
   Stage-0 is a root stage
 
@@ -45,7 +44,7 @@ STAGE PLANS:
                 1 [Column[key]]
               Position of Big Table: 1
 
-  Stage: Stage-5
+  Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
         a:src2 
@@ -74,40 +73,29 @@ STAGE PLANS:
                       expr: _col5
                       type: string
                 outputColumnNames: _col0, _col1, _col2, _col3
-                File Output Operator
-                  compressed: false
-                  GlobalTableId: 0
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: string
+                        expr: _col1
+                        type: string
+                        expr: _col2
+                        type: string
+                        expr: _col3
+                        type: string
+                  sort order: ++++
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: string
+                        expr: _col1
+                        type: string
+                        expr: _col2
+                        type: string
+                        expr: _col3
+                        type: string
       Local Work:
         Map Reduce Local Work
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-                    expr: _col2
-                    type: string
-                    expr: _col3
-                    type: string
-              sort order: ++++
-              tag: -1
-              value expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-                    expr: _col2
-                    type: string
-                    expr: _col3
-                    type: string
       Reduce Operator Tree:
         Extract
           Select Operator