You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/07/29 23:08:19 UTC

svn commit: r1508202 [9/48] - in /hive/branches/tez: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/metrics/ common/src/java/org/apache/hadoop/hive/conf/ common/src/te...

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Mon Jul 29 21:08:03 2013
@@ -84,7 +84,7 @@ public class GenMRTableScan1 implements 
       if (currOp == op) {
         String currAliasId = alias;
         ctx.setCurrAliasId(currAliasId);
-        mapCurrCtx.put(op, new GenMapRedCtx(currTask, currTopOp, currAliasId));
+        mapCurrCtx.put(op, new GenMapRedCtx(currTask, currAliasId));
 
         QBParseInfo parseInfo = parseCtx.getQB().getParseInfo();
         if (parseInfo.isAnalyzeCommand()) {
@@ -117,7 +117,10 @@ public class GenMRTableScan1 implements 
             handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
           }
 
-          currWork.setGatheringStats(true);
+          currWork.getMapWork().setGatheringStats(true);
+          if (currWork.getReduceWork() != null) {
+            currWork.getReduceWork().setGatheringStats(true);
+          }
           // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
           // and pass it to setTaskPlan as the last parameter
           Set<Partition> confirmedPartns = new HashSet<Partition>();
@@ -139,12 +142,12 @@ public class GenMRTableScan1 implements 
             Table source = parseCtx.getQB().getMetaData().getTableForAlias(alias);
             PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns,
                 new HashSet<Partition>(), null);
-            GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currWork, false, ctx, partList);
+            GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
           } else { // non-partitioned table
-            GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currWork, false, ctx);
+            GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
           }
         }
-        return null;
+        return true;
       }
     }
     assert false;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Mon Jul 29 21:08:03 2013
@@ -82,14 +82,13 @@ public class GenMRUnion1 implements Node
     UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
     ctx.getMapCurrCtx().put(
         (Operator<? extends OperatorDesc>) union,
-        new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+        new GenMapRedCtx(ctx.getCurrTask(),
             ctx.getCurrAliasId()));
 
     // if the union is the first time seen, set current task to GenMRUnionCtx
     uCtxTask = ctx.getUnionTask(union);
     if (uCtxTask == null) {
-      uCtxTask = new GenMRUnionCtx();
-      uCtxTask.setUTask(ctx.getCurrTask());
+      uCtxTask = new GenMRUnionCtx(ctx.getCurrTask());
       ctx.setUnionTask(union, uCtxTask);
     }
 
@@ -101,7 +100,7 @@ public class GenMRUnion1 implements Node
       }
     }
 
-    return null;
+    return true;
   }
 
   /**
@@ -192,14 +191,11 @@ public class GenMRUnion1 implements Node
     // The current plan can be thrown away after being merged with the union
     // plan
     Task<? extends Serializable> uTask = uCtxTask.getUTask();
-    MapredWork plan = (MapredWork) uTask.getWork();
     ctx.setCurrTask(uTask);
-    List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
     Operator<? extends OperatorDesc> topOp = ctx.getCurrTopOp();
-    if (!seenOps.contains(topOp) && topOp != null) {
-      seenOps.add(topOp);
+    if (topOp != null && !ctx.isSeenOp(uTask, topOp)) {
       GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx
-          .getCurrTopOp(), plan, false, ctx);
+          .getCurrTopOp(), uTask, false, ctx);
     }
   }
 
@@ -226,6 +222,14 @@ public class GenMRUnion1 implements Node
     // future
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
 
+    if (union.getConf().isAllInputsInSameReducer()) {
+      // All inputs of this UnionOperator are in the same Reducer.
+      // We do not need to break the operator tree.
+      mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
+        new GenMapRedCtx(ctx.getCurrTask(),ctx.getCurrAliasId()));
+      return null;
+    }
+
     UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
 
     ctx.setCurrUnionOp(union);
@@ -246,10 +250,9 @@ public class GenMRUnion1 implements Node
     // union is encountered for the first time
     GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
     if (uCtxTask == null) {
-      uCtxTask = new GenMRUnionCtx();
       uPlan = GenMapRedUtils.getMapRedWork(parseCtx);
       uTask = TaskFactory.get(uPlan, parseCtx.getConf());
-      uCtxTask.setUTask(uTask);
+      uCtxTask = new GenMRUnionCtx(uTask);
       ctx.setUnionTask(union, uCtxTask);
     }
     else {
@@ -284,9 +287,9 @@ public class GenMRUnion1 implements Node
     ctx.setCurrTask(uTask);
 
     mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
-        new GenMapRedCtx(ctx.getCurrTask(), null, null));
+        new GenMapRedCtx(ctx.getCurrTask(), null));
 
-    return null;
+    return true;
   }
 
   private boolean shouldBeRootTask(

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jul 29 21:08:03 2013
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -43,6 +44,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -60,12 +63,14 @@ import org.apache.hadoop.hive.ql.plan.Fe
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 
@@ -80,6 +85,10 @@ public final class GenMapRedUtils {
     LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
   }
 
+  private static boolean needsTagging(ReduceWork rWork) {
+    return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
+         rWork.getReducer().getClass() == DemuxOperator.class);
+  }
   /**
    * Initialize the current plan by adding it to root tasks.
    *
@@ -101,29 +110,21 @@ public final class GenMapRedUtils {
     Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     opTaskMap.put(reducer, currTask);
-    plan.setReducer(reducer);
+    plan.setReduceWork(new ReduceWork());
+    plan.getReduceWork().setReducer(reducer);
     ReduceSinkDesc desc = op.getConf();
 
-    plan.setNumReduceTasks(desc.getNumReducers());
-
-    List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
+    plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
 
-    if (!rootTasks.contains(currTask)
-        && (currTask.getParentTasks() == null
-            || currTask.getParentTasks().isEmpty())) {
-      rootTasks.add(currTask);
-    }
-    if (reducer.getClass() == JoinOperator.class) {
-      plan.setNeedsTagging(true);
+    if (needsTagging(plan.getReduceWork())) {
+      plan.getReduceWork().setNeedsTagging(true);
     }
 
     assert currTopOp != null;
-    List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
     String currAliasId = opProcCtx.getCurrAliasId();
 
-    if (!seenOps.contains(currTopOp)) {
-      seenOps.add(currTopOp);
-      setTaskPlan(currAliasId, currTopOp, plan, false, opProcCtx);
+    if (!opProcCtx.isSeenOp(currTask, currTopOp)) {
+      setTaskPlan(currAliasId, currTopOp, currTask, false, opProcCtx);
     }
 
     currTopOp = null;
@@ -153,29 +154,30 @@ public final class GenMapRedUtils {
         opProcCtx.getOpTaskMap();
 
     opTaskMap.put(reducer, unionTask);
-    plan.setReducer(reducer);
+
+    plan.setReduceWork(new ReduceWork());
+    plan.getReduceWork().setReducer(reducer);
+    plan.getReduceWork().setReducer(reducer);
     ReduceSinkDesc desc = op.getConf();
 
-    plan.setNumReduceTasks(desc.getNumReducers());
+    plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
 
-    if (reducer.getClass() == JoinOperator.class) {
-      plan.setNeedsTagging(true);
+    if (needsTagging(plan.getReduceWork())) {
+      plan.getReduceWork().setNeedsTagging(true);
     }
 
     initUnionPlan(opProcCtx, currUnionOp, unionTask, false);
   }
 
   private static void setUnionPlan(GenMRProcContext opProcCtx,
-      boolean local, MapredWork plan, GenMRUnionCtx uCtx,
+      boolean local, Task<? extends Serializable> currTask, GenMRUnionCtx uCtx,
       boolean mergeTask) throws SemanticException {
     Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     if (currTopOp != null) {
-      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
       String currAliasId = opProcCtx.getCurrAliasId();
-      if (!seenOps.contains(currTopOp) || mergeTask) {
-        seenOps.add(currTopOp);
-        setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+      if (mergeTask || !opProcCtx.isSeenOp(currTask, currTopOp)) {
+        setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx);
       }
       currTopOp = null;
       opProcCtx.setCurrTopOp(currTopOp);
@@ -191,16 +193,18 @@ public final class GenMapRedUtils {
         List<Operator<? extends OperatorDesc>> topOperators =
             uCtx.getListTopOperators();
 
+        MapredWork plan = (MapredWork) currTask.getWork();
         for (int pos = 0; pos < size; pos++) {
           String taskTmpDir = taskTmpDirLst.get(pos);
           TableDesc tt_desc = tt_descLst.get(pos);
-          if (plan.getPathToAliases().get(taskTmpDir) == null) {
-            plan.getPathToAliases().put(taskTmpDir,
+          MapWork mWork = plan.getMapWork();
+          if (mWork.getPathToAliases().get(taskTmpDir) == null) {
+            mWork.getPathToAliases().put(taskTmpDir,
                 new ArrayList<String>());
-            plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-            plan.getPathToPartitionInfo().put(taskTmpDir,
+            mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
+            mWork.getPathToPartitionInfo().put(taskTmpDir,
                 new PartitionDesc(tt_desc, null));
-            plan.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
+            mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
           }
         }
       }
@@ -214,14 +218,12 @@ public final class GenMapRedUtils {
   public static void initUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp,
       Task<? extends Serializable> currTask, boolean local)
       throws SemanticException {
-    MapredWork plan = (MapredWork) currTask.getWork();
-
     // In case of lateral views followed by a join, the same tree
     // can be traversed more than one
     if (currUnionOp != null) {
       GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
       assert uCtx != null;
-      setUnionPlan(opProcCtx, local, plan, uCtx, false);
+      setUnionPlan(opProcCtx, local, currTask, uCtx, false);
     }
   }
 
@@ -233,12 +235,11 @@ public final class GenMapRedUtils {
       Task<? extends Serializable> currentUnionTask,
       Task<? extends Serializable> existingTask, boolean local)
       throws SemanticException {
-    MapredWork plan = (MapredWork) existingTask.getWork();
     assert currUnionOp != null;
     GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
     assert uCtx != null;
 
-    setUnionPlan(opProcCtx, local, plan, uCtx, true);
+    setUnionPlan(opProcCtx, local, existingTask, uCtx, true);
 
     List<Task<? extends Serializable>> parTasks = null;
     if (opProcCtx.getRootTasks().contains(currentUnionTask)) {
@@ -273,104 +274,108 @@ public final class GenMapRedUtils {
   }
 
   /**
-   * Merge the current task with the task for the current reducer.
+   * Merge the current task into the old task for the reducer
    *
-   * @param op
-   *          operator being processed
+   * @param currTask
+   *          the current task for the current reducer
    * @param oldTask
    *          the old task for the current reducer
-   * @param task
-   *          the current task for the current reducer
    * @param opProcCtx
    *          processing context
-   * @param pos
-   *          position of the parent in the stack
    */
-  public static void joinPlan(Operator<? extends OperatorDesc> op,
-      Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
-      GenMRProcContext opProcCtx, int pos, boolean split)
+  public static void joinPlan(Task<? extends Serializable> currTask,
+      Task<? extends Serializable> oldTask, GenMRProcContext opProcCtx)
       throws SemanticException {
-    Task<? extends Serializable> currTask = task;
-    MapredWork plan = (MapredWork) currTask.getWork();
+    assert currTask != null && oldTask != null;
+
     Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
     List<Task<? extends Serializable>> parTasks = null;
-
     // terminate the old task and make current task dependent on it
-    if (split) {
-      assert oldTask != null;
-      splitTasks(op, oldTask, currTask, opProcCtx, true, false, 0);
-    } else {
-      if ((oldTask != null) && (oldTask.getParentTasks() != null)
-          && !oldTask.getParentTasks().isEmpty()) {
-        parTasks = new ArrayList<Task<? extends Serializable>>();
-        parTasks.addAll(oldTask.getParentTasks());
-
-        Object[] parTaskArr = parTasks.toArray();
-        for (Object element : parTaskArr) {
-          ((Task<? extends Serializable>) element).removeDependentTask(oldTask);
-        }
+    if (currTask.getParentTasks() != null
+        && !currTask.getParentTasks().isEmpty()) {
+      parTasks = new ArrayList<Task<? extends Serializable>>();
+      parTasks.addAll(currTask.getParentTasks());
+
+      Object[] parTaskArr = parTasks.toArray();
+      for (Object element : parTaskArr) {
+        ((Task<? extends Serializable>) element).removeDependentTask(currTask);
       }
     }
 
     if (currTopOp != null) {
-      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
-      String currAliasId = opProcCtx.getCurrAliasId();
-
-      if (!seenOps.contains(currTopOp)) {
-        seenOps.add(currTopOp);
-        boolean local = false;
-        if (pos != -1) {
-          local = (pos == ((MapJoinDesc) op.getConf()).getPosBigTable()) ? false
-              : true;
-        }
-        setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-      }
-      currTopOp = null;
-      opProcCtx.setCurrTopOp(currTopOp);
+      mergeInput(currTopOp, opProcCtx, oldTask, false);
     }
 
-    if ((oldTask != null) && (parTasks != null)) {
+    if (parTasks != null) {
       for (Task<? extends Serializable> parTask : parTasks) {
-        parTask.addDependentTask(currTask);
-        if (opProcCtx.getRootTasks().contains(currTask)) {
-          opProcCtx.getRootTasks().remove(currTask);
-        }
+        parTask.addDependentTask(oldTask);
       }
     }
 
-    opProcCtx.setCurrTask(currTask);
+    if (oldTask instanceof MapRedTask && currTask instanceof MapRedTask) {
+      ((MapRedTask)currTask).getWork().getMapWork()
+        .mergingInto(((MapRedTask) oldTask).getWork().getMapWork());
+    }
+
+    opProcCtx.setCurrTopOp(null);
+    opProcCtx.setCurrTask(oldTask);
   }
 
   /**
-   * Split the current plan by creating a temporary destination.
+   * If currTopOp is not set for input of the task, add input for to the task
+   */
+  static boolean mergeInput(Operator<? extends OperatorDesc> currTopOp,
+      GenMRProcContext opProcCtx, Task<? extends Serializable> task, boolean local)
+      throws SemanticException {
+    if (!opProcCtx.isSeenOp(task, currTopOp)) {
+      String currAliasId = opProcCtx.getCurrAliasId();
+      setTaskPlan(currAliasId, currTopOp, task, local, opProcCtx);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Met cRS in pRS(parentTask)-cRS-OP(childTask) case
+   * Split and link two tasks by temporary file : pRS-FS / TS-cRS-OP
+   */
+  static void splitPlan(ReduceSinkOperator cRS,
+      Task<? extends Serializable> parentTask, Task<? extends Serializable> childTask,
+      GenMRProcContext opProcCtx) throws SemanticException {
+    assert parentTask != null && childTask != null;
+    splitTasks(cRS, parentTask, childTask, opProcCtx);
+  }
+
+  /**
+   * Met cRS in pOP(parentTask with RS)-cRS-cOP(noTask) case
+   * Create new child task for cRS-cOP and link two tasks by temporary file : pOP-FS / TS-cRS-cOP
    *
-   * @param op
+   * @param cRS
    *          the reduce sink operator encountered
    * @param opProcCtx
    *          processing context
    */
-  public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
+  static void splitPlan(ReduceSinkOperator cRS, GenMRProcContext opProcCtx)
       throws SemanticException {
     // Generate a new task
     ParseContext parseCtx = opProcCtx.getParseCtx();
-    MapredWork cplan = getMapRedWork(parseCtx);
-    Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+    Task<? extends Serializable> parentTask = opProcCtx.getCurrTask();
+
+    MapredWork childPlan = getMapRedWork(parseCtx);
+    Task<? extends Serializable> childTask = TaskFactory.get(childPlan, parseCtx
         .getConf());
-    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Operator<? extends OperatorDesc> reducer = cRS.getChildOperators().get(0);
 
     // Add the reducer
-    cplan.setReducer(reducer);
-    ReduceSinkDesc desc = op.getConf();
-
-    cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
+    ReduceWork rWork = new ReduceWork();
+    childPlan.setReduceWork(rWork);
+    rWork.setReducer(reducer);
+    ReduceSinkDesc desc = cRS.getConf();
+    childPlan.getReduceWork().setNumReduceTasks(new Integer(desc.getNumReducers()));
 
-    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-        opProcCtx.getOpTaskMap();
-    opTaskMap.put(reducer, redTask);
-    Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+    opProcCtx.getOpTaskMap().put(reducer, childTask);
 
-    splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
-    opProcCtx.getRootOps().add(op);
+    splitTasks(cRS, parentTask, childTask, opProcCtx);
   }
 
   /**
@@ -388,9 +393,9 @@ public final class GenMapRedUtils {
    *          processing context
    */
   public static void setTaskPlan(String alias_id,
-      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx) throws SemanticException {
-    setTaskPlan(alias_id, topOp, plan, local, opProcCtx, null);
+    setTaskPlan(alias_id, topOp, task, local, opProcCtx, null);
   }
 
   private static ReadEntity getParentViewInfo(String alias_id,
@@ -432,8 +437,9 @@ public final class GenMapRedUtils {
    *          pruned partition list. If it is null it will be computed on-the-fly.
    */
   public static void setTaskPlan(String alias_id,
-      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
+    MapWork plan = ((MapredWork) task.getWork()).getMapWork();
     ParseContext parseCtx = opProcCtx.getParseCtx();
     Set<ReadEntity> inputs = opProcCtx.getInputs();
 
@@ -488,6 +494,15 @@ public final class GenMapRedUtils {
 
     }
 
+    Map<String, String> props = parseCtx.getTopToProps().get(topOp);
+    if (props != null) {
+      Properties target = aliasPartnDesc.getProperties();
+      if (target == null) {
+        aliasPartnDesc.setProperties(target = new Properties());
+      }
+      target.putAll(props);
+    }
+
     plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
 
     long sizeNeeded = Integer.MAX_VALUE;
@@ -608,6 +623,14 @@ public final class GenMapRedUtils {
         tblDesc = Utilities.getTableDesc(part.getTable());
       }
 
+      if (props != null) {
+        Properties target = tblDesc.getProperties();
+        if (target == null) {
+          tblDesc.setProperties(target = new Properties());
+        }
+        target.putAll(props);
+      }
+
       for (Path p : paths) {
         if (p == null) {
           continue;
@@ -681,6 +704,7 @@ public final class GenMapRedUtils {
       }
       plan.setMapLocalWork(localPlan);
     }
+    opProcCtx.addSeenOp(task, topOp);
   }
 
   /**
@@ -698,7 +722,7 @@ public final class GenMapRedUtils {
    *          table descriptor
    */
   public static void setTaskPlan(String path, String alias,
-      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
       TableDesc tt_desc) throws SemanticException {
 
     if (path == null || alias == null) {
@@ -737,7 +761,7 @@ public final class GenMapRedUtils {
    * @param topOp
    *          current top operator in the path
    */
-  public static void setKeyAndValueDesc(MapredWork plan,
+  public static void setKeyAndValueDesc(ReduceWork plan,
       Operator<? extends OperatorDesc> topOp) {
     if (topOp == null) {
       return;
@@ -778,12 +802,12 @@ public final class GenMapRedUtils {
       }
     } else if (task instanceof ExecDriver) {
       MapredWork work = (MapredWork) task.getWork();
-      work.deriveExplainAttributes();
+      work.getMapWork().deriveExplainAttributes();
       HashMap<String, Operator<? extends OperatorDesc>> opMap = work
-          .getAliasToWork();
+          .getMapWork().getAliasToWork();
       if (opMap != null && !opMap.isEmpty()) {
         for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          setKeyAndValueDesc(work, op);
+          setKeyAndValueDesc(work.getReduceWork(), op);
         }
       }
     }
@@ -804,7 +828,7 @@ public final class GenMapRedUtils {
    */
   public static MapredWork getMapRedWork(ParseContext parseCtx) {
     MapredWork work = getMapRedWorkFromConf(parseCtx.getConf());
-    work.setNameToSplitSample(parseCtx.getNameToSplitSample());
+    work.getMapWork().setNameToSplitSample(parseCtx.getNameToSplitSample());
     return work;
   }
 
@@ -815,7 +839,8 @@ public final class GenMapRedUtils {
    * @return the new plan
    */
   public static MapredWork getMapRedWorkFromConf(HiveConf conf) {
-    MapredWork work = new MapredWork();
+    MapredWork mrWork = new MapredWork();
+    MapWork work = mrWork.getMapWork();
 
     boolean mapperCannotSpanPartns =
         conf.getBoolVar(
@@ -824,11 +849,9 @@ public final class GenMapRedUtils {
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
-    work.setTagToValueDesc(new ArrayList<TableDesc>());
-    work.setReducer(null);
     work.setHadoopSupportsSplittable(
         conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
-    return work;
+    return mrWork;
   }
 
   /**
@@ -851,20 +874,20 @@ public final class GenMapRedUtils {
 
   @SuppressWarnings("nls")
   /**
-   * Merge the tasks - by creating a temporary file between them.
+   * Split two tasks by creating a temporary file between them.
+   *
    * @param op reduce sink operator being processed
-   * @param oldTask the parent task
-   * @param task the child task
+   * @param parentTask the parent task
+   * @param childTask the child task
    * @param opProcCtx context
-   * @param setReducer does the reducer needs to be set
-   * @param pos position of the parent
    **/
-  public static void splitTasks(Operator<? extends OperatorDesc> op,
-      Task<? extends Serializable> parentTask,
-      Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
-      boolean setReducer, boolean local, int posn) throws SemanticException {
-    childTask.getWork();
-    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+  private static void splitTasks(ReduceSinkOperator op,
+      Task<? extends Serializable> parentTask, Task<? extends Serializable> childTask,
+      GenMRProcContext opProcCtx) throws SemanticException {
+    if (op.getNumParent() != 1) {
+      throw new IllegalStateException("Expecting operator " + op + " to have one parent. " +
+          "But found multiple parents : " + op.getParentOperators());
+    }
 
     ParseContext parseCtx = opProcCtx.getParseCtx();
     parentTask.addDependentTask(childTask);
@@ -880,7 +903,7 @@ public final class GenMapRedUtils {
     Context baseCtx = parseCtx.getContext();
     String taskTmpDir = baseCtx.getMRTmpFileURI();
 
-    Operator<? extends OperatorDesc> parent = op.getParentOperators().get(posn);
+    Operator<? extends OperatorDesc> parent = op.getParentOperators().get(0);
     TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
         .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
 
@@ -922,40 +945,46 @@ public final class GenMapRedUtils {
     childOpList = new ArrayList<Operator<? extends OperatorDesc>>();
     childOpList.add(op);
     ts_op.setChildOperators(childOpList);
-    op.getParentOperators().set(posn, ts_op);
+    op.getParentOperators().set(0, ts_op);
 
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
         opProcCtx.getMapCurrCtx();
-    mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
+    mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null));
 
     String streamDesc = taskTmpDir;
     MapredWork cplan = (MapredWork) childTask.getWork();
 
-    if (setReducer) {
-      Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
 
-      if (reducer.getClass() == JoinOperator.class) {
-        String origStreamDesc;
-        streamDesc = "$INTNAME";
-        origStreamDesc = streamDesc;
-        int pos = 0;
-        while (cplan.getAliasToWork().get(streamDesc) != null) {
-          streamDesc = origStreamDesc.concat(String.valueOf(++pos));
-        }
+    if (needsTagging(cplan.getReduceWork())) {
+      String origStreamDesc;
+      streamDesc = "$INTNAME";
+      origStreamDesc = streamDesc;
+      int pos = 0;
+      while (cplan.getMapWork().getAliasToWork().get(streamDesc) != null) {
+        streamDesc = origStreamDesc.concat(String.valueOf(++pos));
       }
 
       // TODO: Allocate work to remove the temporary files and make that
       // dependent on the redTask
-      if (reducer.getClass() == JoinOperator.class) {
-        cplan.setNeedsTagging(true);
-      }
+      cplan.getReduceWork().setNeedsTagging(true);
     }
 
     // Add the path to alias mapping
-    setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
+    setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan.getMapWork(), false, tt_desc);
     opProcCtx.setCurrTopOp(null);
     opProcCtx.setCurrAliasId(null);
     opProcCtx.setCurrTask(childTask);
+    opProcCtx.addRootIfPossible(parentTask);
+  }
+
+  static boolean hasBranchFinished(Object... children) {
+    for (Object child : children) {
+      if (child == null) {
+        return false;
+      }
+    }
+    return true;
   }
 
   private GenMapRedUtils() {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Mon Jul 29 21:08:03 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -77,7 +78,7 @@ public final class MapJoinFactory {
    */
   private static class TableScanMapJoinProcessor implements NodeProcessor {
 
-    public static void setupBucketMapJoinInfo(MapredWork plan,
+    public static void setupBucketMapJoinInfo(MapWork plan,
         AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
       if (currMapJoinOp != null) {
         Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
@@ -144,38 +145,16 @@ public final class MapJoinFactory {
      *          position of the parent
      */
     private static void initMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
-        GenMRProcContext opProcCtx, int pos)
+        Task<? extends Serializable> currTask,
+        GenMRProcContext opProcCtx, boolean local)
         throws SemanticException {
-      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
-          opProcCtx.getMapCurrCtx();
-      int parentPos = (pos == -1) ? 0 : pos;
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
-          parentPos));
-      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-      MapredWork plan = (MapredWork) currTask.getWork();
-      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-          opProcCtx.getOpTaskMap();
-      Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
-
-      MapJoinDesc desc = (MapJoinDesc) op.getConf();
 
       // The map is overloaded to keep track of mapjoins also
-      opTaskMap.put(op, currTask);
-
-      List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-      if(!rootTasks.contains(currTask)
-         && (currTask.getParentTasks() == null
-             || currTask.getParentTasks().isEmpty())) {
-        rootTasks.add(currTask);
-      }
-
-      assert currTopOp != null;
-      opProcCtx.getSeenOps().add(currTopOp);
+      opProcCtx.getOpTaskMap().put(op, currTask);
 
+      Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
       String currAliasId = opProcCtx.getCurrAliasId();
-      boolean local = (pos == desc.getPosBigTable()) ? false : true;
-      GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-      setupBucketMapJoinInfo(plan, op);
+      GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx);
     }
 
     /**
@@ -191,29 +170,12 @@ public final class MapJoinFactory {
      * @param pos
      *          position of the parent in the stack
      */
-    public static void joinMapJoinPlan(AbstractMapJoinOperator<? extends OperatorDesc> op,
+    private static void joinMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
         Task<? extends Serializable> oldTask,
-        GenMRProcContext opProcCtx, int pos)
+        GenMRProcContext opProcCtx, boolean local)
         throws SemanticException {
-      MapredWork plan = (MapredWork) oldTask.getWork();
       Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
-
-      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
-      String currAliasId = opProcCtx.getCurrAliasId();
-
-      if (!seenOps.contains(currTopOp)) {
-        seenOps.add(currTopOp);
-        boolean local = false;
-        if (pos != -1) {
-          local = (pos == ((MapJoinDesc) op.getConf()).getPosBigTable()) ? false
-              : true;
-        }
-        GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-        setupBucketMapJoinInfo(plan, op);
-      }
-      currTopOp = null;
-      opProcCtx.setCurrTopOp(currTopOp);
-      opProcCtx.setCurrTask(oldTask);
+      GenMapRedUtils.mergeInput(currTopOp, opProcCtx, oldTask, local);
     }
 
     /*
@@ -236,17 +198,14 @@ public final class MapJoinFactory {
 
       Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
           .getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
-          pos));
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
       Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       MapredWork currPlan = (MapredWork) currTask.getWork();
-      Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
       String currAliasId = mapredCtx.getCurrAliasId();
       HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
           ctx.getOpTaskMap();
-      Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
+      Task<? extends Serializable> oldTask = opTaskMap.get(mapJoin);
 
-      ctx.setCurrTopOp(currTopOp);
       ctx.setCurrAliasId(currAliasId);
       ctx.setCurrTask(currTask);
 
@@ -254,20 +213,23 @@ public final class MapJoinFactory {
       // If we are seeing this mapjoin for the second or later time then atleast one of the
       // branches for this mapjoin have been encounered. Join the plan with the plan created
       // the first time.
-      if (opMapTask == null) {
-        assert currPlan.getReducer() == null;
-        initMapJoinPlan(mapJoin, ctx, pos);
+      boolean local = pos != mapJoin.getConf().getPosBigTable();
+      if (oldTask == null) {
+        assert currPlan.getReduceWork() == null;
+        initMapJoinPlan(mapJoin, currTask, ctx, local);
       } else {
         // The current plan can be thrown away after being merged with the
         // original plan
-        joinMapJoinPlan(mapJoin, opMapTask, ctx, pos);
-        currTask = opMapTask;
-        ctx.setCurrTask(currTask);
+        joinMapJoinPlan(mapJoin, oldTask, ctx, local);
+        ctx.setCurrTask(currTask = oldTask);
       }
+      MapredWork plan = (MapredWork) currTask.getWork();
+      setupBucketMapJoinInfo(plan.getMapWork(), mapJoin);
+
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId()));
 
-      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
-          .getCurrTopOp(), ctx.getCurrAliasId()));
-      return null;
+      // local aliases need not to hand over context further
+      return !local;
     }
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jul 29 21:08:03 2013
@@ -134,7 +134,7 @@ public class MapJoinProcessor implements
         new LinkedHashMap<String, FetchWork>());
 
     for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
-      newWork.getAliasToWork().entrySet()) {
+      newWork.getMapWork().getAliasToWork().entrySet()) {
       String alias = entry.getKey();
       Operator<? extends OperatorDesc> op = entry.getValue();
 
@@ -162,7 +162,7 @@ public class MapJoinProcessor implements
       smallTableAliasList.add(alias);
       // get input path and remove this alias from pathToAlias
       // because this file will be fetched by fetch operator
-      LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getPathToAliases();
+      LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases();
 
       // keep record all the input path for this alias
       HashSet<String> pathSet = new HashSet<String>();
@@ -193,7 +193,7 @@ public class MapJoinProcessor implements
       List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
 
       for (String tablePath : pathSet) {
-        PartitionDesc partitionDesc = newWork.getPathToPartitionInfo().get(tablePath);
+        PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath);
         // create fetchwork for non partitioned table
         if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) {
           fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc());
@@ -205,7 +205,7 @@ public class MapJoinProcessor implements
       }
       // create fetchwork for partitioned table
       if (fetchWork == null) {
-        TableDesc table = newWork.getAliasToPartnInfo().get(alias).getTableDesc();
+        TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
         fetchWork = new FetchWork(partDir, partDesc, table);
       }
       // set alias to fetch work
@@ -213,13 +213,13 @@ public class MapJoinProcessor implements
     }
     // remove small table ailias from aliasToWork;Avoid concurrent modification
     for (String alias : smallTableAliasList) {
-      newWork.getAliasToWork().remove(alias);
+      newWork.getMapWork().getAliasToWork().remove(alias);
     }
 
     // set up local work
-    newWork.setMapLocalWork(newLocalWork);
+    newWork.getMapWork().setMapLocalWork(newLocalWork);
     // remove reducer
-    newWork.setReducer(null);
+    newWork.setReduceWork(null);
     // return the big table alias
     if (bigTableAlias == null) {
       throw new SemanticException("Big Table Alias is null");
@@ -240,8 +240,8 @@ public class MapJoinProcessor implements
   public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
       throws SemanticException {
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
-        newWork.getOpParseCtxMap();
-    QBJoinTree newJoinTree = newWork.getJoinTree();
+        newWork.getMapWork().getOpParseCtxMap();
+    QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
     // generate the map join operator; already checked the map join
     MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
         newJoinTree, mapJoinPos, true, false);
@@ -256,14 +256,15 @@ public class MapJoinProcessor implements
       String bigTableAlias = MapJoinProcessor
           .genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
       // clean up the mapred work
-      newWork.setOpParseCtxMap(null);
-      newWork.setJoinTree(null);
+      newWork.getMapWork().setOpParseCtxMap(null);
+      newWork.getMapWork().setJoinTree(null);
 
       return bigTableAlias;
 
     } catch (Exception e) {
       e.printStackTrace();
-      throw new SemanticException("Generate New MapJoin Opertor Exeception " + e.getMessage());
+      throw new SemanticException("Failed to generate new mapJoin operator " +
+          "by exception : " + e.getMessage());
     }
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Jul 29 21:08:03 2013
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.correlation.CorrelationOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication;
 import org.apache.hadoop.hive.ql.optimizer.index.RewriteGBUsingIndex;
 import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
@@ -103,6 +105,11 @@ public class Optimizer {
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
       transformations.add(new GlobalLimitOptimizer());
     }
+    if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+      transformations.add(new CorrelationOptimizer());
+    }
     transformations.add(new SimpleFetchOptimizer());  // must be called last
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java Mon Jul 29 21:08:03 2013
@@ -28,32 +28,22 @@ import org.apache.hadoop.hive.ql.metadat
  * partition pruned for the table scan and table alias.
  */
 public class PcrExprProcCtx implements NodeProcessorCtx {
+  /**
+   * The table alias that is being currently processed.
+   */
+  private final String tabAlias;
+  private final List<Partition> partList;
 
   public PcrExprProcCtx(String tabAlias, List<Partition> partList) {
-    super();
     this.tabAlias = tabAlias;
     this.partList = partList;
   }
 
-  /**
-   * The table alias that is being currently processed.
-   */
-  String tabAlias;
-  List<Partition> partList;
-
   public String getTabAlias() {
     return tabAlias;
   }
 
-  public void setTabAlias(String tabAlias) {
-    this.tabAlias = tabAlias;
-  }
-
   public List<Partition> getPartList() {
     return partList;
   }
-
-  public void setPartList(List<Partition> partList) {
-    this.partList = partList;
-  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Mon Jul 29 21:08:03 2013
@@ -364,16 +364,14 @@ public final class PcrExprProcFactory {
         Object... nodeOutputs) throws SemanticException {
       ExprNodeFieldDesc fnd = (ExprNodeFieldDesc) nd;
       boolean unknown = false;
-      int idx = 0;
       for (Object child : nodeOutputs) {
         NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
         if (wrapper.state == WalkState.UNKNOWN) {
           unknown = true;
+          break;
         }
       }
 
-      assert (idx == 0);
-
       if (unknown) {
         return new NodeInfoWrapper(WalkState.UNKNOWN, null, fnd);
       } else {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java Mon Jul 29 21:08:03 2013
@@ -27,14 +27,15 @@ import java.util.Stack;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 
 /**
  * Common iteration methods for converting joins and sort-merge joins.
@@ -119,7 +120,7 @@ public abstract class AbstractJoinTaskDi
     }
   }
 
-  public long getTotalKnownInputSize(Context context, MapredWork currWork,
+  public long getTotalKnownInputSize(Context context, MapWork currWork,
       Map<String, ArrayList<String>> pathToAliases,
       HashMap<String, Long> aliasToSize) throws SemanticException {
     try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java Mon Jul 29 21:08:03 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.plan.Op
  */
 public class BucketingSortingCtx implements NodeProcessorCtx {
 
+  boolean disableBucketing;
+
   // A mapping from an operator to the columns by which it's output is bucketed
   Map<Operator<? extends OperatorDesc>, List<BucketCol>> bucketedColsByOp;
   // A mapping from a directory which a FileSinkOperator writes into to the columns by which that
@@ -48,7 +50,8 @@ public class BucketingSortingCtx impleme
   // output is sorted
   Map<String, List<SortCol>> sortedColsByDirectory;
 
-  public BucketingSortingCtx() {
+  public BucketingSortingCtx(boolean disableBucketing) {
+    this.disableBucketing = disableBucketing;
     this.bucketedColsByOp = new HashMap<Operator<? extends OperatorDesc>, List<BucketCol>>();
     this.bucketedColsByDirectory = new HashMap<String, List<BucketCol>>();
     this.sortedColsByOp = new HashMap<Operator<? extends OperatorDesc>, List<SortCol>>();
@@ -57,21 +60,25 @@ public class BucketingSortingCtx impleme
 
 
   public List<BucketCol> getBucketedCols(Operator<? extends OperatorDesc> op) {
-    return bucketedColsByOp.get(op);
+    return disableBucketing ? null : bucketedColsByOp.get(op);
   }
 
 
   public void setBucketedCols(Operator<? extends OperatorDesc> op, List<BucketCol> bucketCols) {
-    this.bucketedColsByOp.put(op, bucketCols);
+    if (!disableBucketing) {
+      bucketedColsByOp.put(op, bucketCols);
+    }
   }
 
   public Map<String, List<BucketCol>> getBucketedColsByDirectory() {
-    return bucketedColsByDirectory;
+    return disableBucketing ? null : bucketedColsByDirectory;
   }
 
 
   public void setBucketedColsByDirectory(Map<String, List<BucketCol>> bucketedColsByDirectory) {
-    this.bucketedColsByDirectory = bucketedColsByDirectory;
+    if (!disableBucketing) {
+      this.bucketedColsByDirectory = bucketedColsByDirectory;
+    }
   }
 
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java Mon Jul 29 21:08:03 2013
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Li
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -86,12 +86,14 @@ public class BucketingSortingInferenceOp
         continue;
       }
 
-      Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReducer();
-      if (reducer == null) {
+      if (mapRedTask.getWork().getReduceWork() == null) {
         continue;
       }
+      Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReduceWork().getReducer();
 
-      BucketingSortingCtx bCtx = new BucketingSortingCtx();
+      // uses sampling, which means it's not bucketed
+      boolean disableBucketing = mapRedTask.getWork().getMapWork().getSamplingType() > 0;
+      BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing);
 
       // RuleRegExp rules are used to match operators anywhere in the tree
       // RuleExactMatch rules are used to specify exactly what the tree should look like
@@ -143,8 +145,8 @@ public class BucketingSortingInferenceOp
       topNodes.add(reducer);
       ogw.startWalking(topNodes, null);
 
-      mapRedTask.getWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
-      mapRedTask.getWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
+      mapRedTask.getWork().getMapWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
+      mapRedTask.getWork().getMapWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
     }
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jul 29 21:08:03 2013
@@ -34,12 +34,12 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
@@ -50,10 +50,12 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 
 /*
  * Convert tasks involving JOIN into MAPJOIN.
@@ -108,7 +110,7 @@ public class CommonJoinTaskDispatcher ex
   }
 
   // Get the position of the big table for this join operator and the given alias
-  private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
+  private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
       String alias) {
     Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
 
@@ -127,9 +129,9 @@ public class CommonJoinTaskDispatcher ex
    */
   private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
     MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
-    MapredWork work = task.getWork();
+    MapWork work = task.getWork().getMapWork();
     MapredLocalWork localWork = work.getMapLocalWork();
-    MapredWork childWork = childTask.getWork();
+    MapWork childWork = childTask.getWork().getMapWork();
     MapredLocalWork childLocalWork = childWork.getMapLocalWork();
 
     // Can this be merged
@@ -205,21 +207,27 @@ public class CommonJoinTaskDispatcher ex
       }
     }
 
+    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
+    // top of the second
     Operator<? extends Serializable> childAliasOp =
         childWork.getAliasToWork().values().iterator().next();
     if (fop.getParentOperators().size() > 1) {
       return;
     }
-
-    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
-    // top of the second
     Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
-    parentFOp.getChildOperators().remove(fop);
-    parentFOp.getChildOperators().add(childAliasOp);
-    List<Operator<? extends OperatorDesc>> parentOps =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-    parentOps.add(parentFOp);
-    childAliasOp.setParentOperators(parentOps);
+    // remove the unnecessary TableScan
+    if (childAliasOp instanceof TableScanOperator) {
+      TableScanOperator tso = (TableScanOperator)childAliasOp;
+      if (tso.getNumChild() != 1) {
+        // shouldn't happen
+        return;
+      }
+      childAliasOp = tso.getChildOperators().get(0);
+      childAliasOp.replaceParent(tso, parentFOp);
+    } else {
+      childAliasOp.setParentOperators(Utilities.makeList(parentFOp));
+    }
+    parentFOp.replaceChild(fop, childAliasOp);
 
     work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
     for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
@@ -256,19 +264,26 @@ public class CommonJoinTaskDispatcher ex
    * @param childTask
    */
   private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
-    MapredWork childWork = childTask.getWork();
+    MapredWork mrChildWork = childTask.getWork();
+    ReduceWork childWork = childTask.getWork().getReduceWork();
+    if (childWork == null) {
+      return;
+    }
+
     Operator childReducer = childWork.getReducer();
     MapredWork work = task.getWork();
     if (childReducer == null) {
       return;
     }
-    work.setReducer(childReducer);
-    work.setNumReduceTasks(childWork.getNumReduceTasks());
-    work.setJoinTree(childWork.getJoinTree());
-    work.setNeedsTagging(childWork.getNeedsTagging());
+    ReduceWork rWork = new ReduceWork();
+    work.setReduceWork(rWork);
+    rWork.setReducer(childReducer);
+    rWork.setNumReduceTasks(childWork.getNumReduceTasks());
+    work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
+    rWork.setNeedsTagging(childWork.getNeedsTagging());
 
     // Make sure the key configuration is correct, clear and regenerate.
-    work.getTagToValueDesc().clear();
+    rWork.getTagToValueDesc().clear();
     GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
   }
 
@@ -303,10 +318,9 @@ public class CommonJoinTaskDispatcher ex
       return;
     }
     MapRedTask childTask = (MapRedTask) firstChildTask;
-    MapredWork mapJoinWork = mapJoinTask.getWork();
+    MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
     MapredWork childWork = childTask.getWork();
-    Operator childReducer = childWork.getReducer();
-    if (childReducer == null) {
+    if (childWork.getReduceWork() == null) {
       // Not a MR job, nothing to merge.
       return;
     }
@@ -316,7 +330,7 @@ public class CommonJoinTaskDispatcher ex
     if (aliasToWork.size() > 1) {
       return;
     }
-    Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
+    Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
     if (childPathToAliases.size() > 1) {
       return;
     }
@@ -347,7 +361,7 @@ public class CommonJoinTaskDispatcher ex
     }
 
     MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
-    MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+    MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
 
     // Either of them should not be bucketed
     if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
@@ -355,12 +369,12 @@ public class CommonJoinTaskDispatcher ex
       return;
     }
 
-    if (childWork.getAliasToWork().size() > 1) {
+    if (childWork.getMapWork().getAliasToWork().size() > 1) {
       return;
     }
 
     Operator<? extends Serializable> childAliasOp =
-        childWork.getAliasToWork().values().iterator().next();
+        childWork.getMapWork().getAliasToWork().values().iterator().next();
     if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
       return;
     }
@@ -387,10 +401,10 @@ public class CommonJoinTaskDispatcher ex
     parentOps.add(parentFOp);
     childAliasOp.setParentOperators(parentOps);
 
-    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
-    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
+    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
+    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
         .entrySet()) {
-      if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+      if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
         mapJoinWork.getPathToPartitionInfo()
             .put(childWorkEntry.getKey(), childWorkEntry.getValue());
       }
@@ -416,6 +430,22 @@ public class CommonJoinTaskDispatcher ex
     copyReducerConf(mapJoinTask, childTask);
   }
 
+  public static boolean cannotConvert(String bigTableAlias,
+      Map<String, Long> aliasToSize, long aliasTotalKnownInputSize,
+      long ThresholdOfSmallTblSizeSum) {
+    boolean ret = false;
+    Long aliasKnownSize = aliasToSize.get(bigTableAlias);
+    if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
+      long smallTblTotalKnownSize = aliasTotalKnownInputSize
+          - aliasKnownSize.longValue();
+      if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
+        //this table is not good to be a big table.
+        ret = true;
+      }
+    }
+    return ret;
+  }
+
   @Override
   public Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
       ConditionalTask conditionalTask, Context context)
@@ -428,7 +458,7 @@ public class CommonJoinTaskDispatcher ex
     }
     currTask.setTaskTag(Task.COMMON_JOIN);
 
-    MapredWork currWork = currTask.getWork();
+    MapWork currWork = currTask.getWork().getMapWork();
 
     // create conditional work list and task list
     List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -519,7 +549,7 @@ public class CommonJoinTaskDispatcher ex
 
       if (convertJoinMapJoin) {
         // create map join task and set big table as bigTablePosition
-        MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
+        MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst();
 
         newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
         replaceTask(currTask, newTask, physicalContext);
@@ -555,23 +585,18 @@ public class CommonJoinTaskDispatcher ex
         }
         // deep copy a new mapred work from xml
         // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
-        String xml = currWork.toXML();
+        String xml = currTask.getWork().toXML();
         InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-        MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+        MapredWork newWork = Utilities.deserializeObject(in);
 
         // create map join task and set big table as i
         ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
         MapRedTask newTask = newTaskAlias.getFirst();
         bigTableAlias = newTaskAlias.getSecond();
 
-        Long aliasKnownSize = aliasToSize.get(bigTableAlias);
-        if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
-          long smallTblTotalKnownSize = aliasTotalKnownInputSize
-              - aliasKnownSize.longValue();
-          if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
-            // this table is not good to be a big table.
-            continue;
-          }
+        if (cannotConvert(bigTableAlias, aliasToSize,
+            aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+          continue;
         }
 
         // add into conditional task
@@ -642,14 +667,15 @@ public class CommonJoinTaskDispatcher ex
   }
 
   private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
-    MapredWork work = task.getWork();
-    if (work == null) {
+    MapWork mWork = task.getWork().getMapWork();
+    ReduceWork rWork = task.getWork().getReduceWork();
+    if (rWork == null) {
       return null;
     }
-    Operator<? extends OperatorDesc> reducerOp = work.getReducer();
+    Operator<? extends OperatorDesc> reducerOp = rWork.getReducer();
     if (reducerOp instanceof JoinOperator) {
       /* Is any operator present, which prevents the conversion */
-      Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
+      Map<String, Operator<? extends OperatorDesc>> aliasToWork = mWork.getAliasToWork();
       for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
         if (!checkOperatorOKMapJoinConversion(op)) {
           return null;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Mon Jul 29 21:08:03 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -104,6 +104,7 @@ public final class GenMRSkewJoinProcesso
    * https://issues.apache.org/jira/browse/HIVE-964.
    *
    */
+  @SuppressWarnings("unchecked")
   public static void processSkewJoin(JoinOperator joinOp,
       Task<? extends Serializable> currTask, ParseContext parseCtx)
       throws SemanticException {
@@ -151,7 +152,7 @@ public final class GenMRSkewJoinProcesso
     List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
     MapredWork currPlan = (MapredWork) currTask.getWork();
 
-    TableDesc keyTblDesc = (TableDesc) currPlan.getKeyDesc().clone();
+    TableDesc keyTblDesc = (TableDesc) currPlan.getReduceWork().getKeyDesc().clone();
     List<String> joinKeys = Utilities
         .getColumnNames(keyTblDesc.getProperties());
     List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc
@@ -232,7 +233,7 @@ public final class GenMRSkewJoinProcesso
 
     for (int i = 0; i < numAliases - 1; i++) {
       Byte src = tags[i];
-      MapredWork newPlan = PlanUtils.getMapRedWork();
+      MapWork newPlan = PlanUtils.getMapRedWork().getMapWork();
 
       // This code has been only added for testing
       boolean mapperCannotSpanPartns =
@@ -246,7 +247,7 @@ public final class GenMRSkewJoinProcesso
         StringBuilder sb = new StringBuilder(xmlPlan);
         ByteArrayInputStream bis;
         bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
-        clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+        clonePlan = Utilities.deserializeObject(bis);
       } catch (UnsupportedEncodingException e) {
         throw new SemanticException(e);
       }
@@ -276,7 +277,7 @@ public final class GenMRSkewJoinProcesso
       newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
       newPlan.getAliasToPartnInfo().put(alias, part);
 
-      Operator<? extends OperatorDesc> reducer = clonePlan.getReducer();
+      Operator<? extends OperatorDesc> reducer = clonePlan.getReduceWork().getReducer();
       assert reducer instanceof JoinOperator;
       JoinOperator cloneJoinOp = (JoinOperator) reducer;
 
@@ -328,16 +329,18 @@ public final class GenMRSkewJoinProcesso
       newPlan
           .setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
       newPlan.setInputformat(HiveInputFormat.class.getName());
-      Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
-          newPlan, jc);
+
+      MapredWork w = new MapredWork();
+      w.setMapWork(newPlan);
+
+      Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(w, jc);
       bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
       listWorks.add(skewJoinMapJoinTask.getWork());
       listTasks.add(skewJoinMapJoinTask);
     }
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
-    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork,
-        parseCtx.getConf());
+    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
     cndTsk.setListTasks(listTasks);
     cndTsk.setResolver(new ConditionalResolverSkewJoin());
     cndTsk

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java Mon Jul 29 21:08:03 2013
@@ -34,7 +34,7 @@ public class IndexWhereResolver implemen
     Dispatcher dispatcher = new IndexWhereTaskDispatcher(physicalContext);
     GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher);
     ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(physicalContext.rootTasks);
+    topNodes.addAll(physicalContext.getRootTasks());
     opGraphWalker.startWalking(topNodes, null);
 
     return physicalContext;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Mon Jul 29 21:08:03 2013
@@ -28,12 +28,12 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -48,8 +48,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import
-  org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -73,7 +72,7 @@ public class MapJoinResolver implements 
 
     // get all the tasks nodes from root task
     ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pctx.rootTasks);
+    topNodes.addAll(pctx.getRootTasks());
 
     // begin to walk through the task tree.
     ogw.startWalking(topNodes, null);
@@ -98,14 +97,14 @@ public class MapJoinResolver implements 
         ConditionalTask conditionalTask) throws SemanticException {
       // get current mapred work and its local work
       MapredWork mapredWork = (MapredWork) currTask.getWork();
-      MapredLocalWork localwork = mapredWork.getMapLocalWork();
+      MapredLocalWork localwork = mapredWork.getMapWork().getMapLocalWork();
       if (localwork != null) {
         // get the context info and set up the shared tmp URI
         Context ctx = physicalContext.getContext();
         String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
         localwork.setTmpFileURI(tmpFileURI);
         String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
-        mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
+        mapredWork.getMapWork().setTmpHDFSFileURI(hdfsTmpURI);
         // create a task for this local work; right now, this local work is shared
         // by the original MapredTask and this new generated MapredLocalTask.
         MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext
@@ -134,7 +133,7 @@ public class MapJoinResolver implements 
         newLocalWork.setTmpFileURI(tmpFileURI);
         newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
         newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly());
-        mapredWork.setMapLocalWork(newLocalWork);
+        mapredWork.getMapWork().setMapLocalWork(newLocalWork);
         // get all parent tasks
         List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
         currTask.setParentTasks(null);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java Mon Jul 29 21:08:03 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -171,7 +172,7 @@ public class MetadataOnlyOptimizer imple
     Dispatcher disp = new MetadataOnlyTaskDispatcher(pctx);
     GraphWalker ogw = new DefaultGraphWalker(disp);
     ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pctx.rootTasks);
+    topNodes.addAll(pctx.getRootTasks());
     ogw.startWalking(topNodes, null);
     return pctx;
   }
@@ -188,7 +189,7 @@ public class MetadataOnlyOptimizer imple
       physicalContext = context;
     }
 
-    private String getAliasForTableScanOperator(MapredWork work,
+    private String getAliasForTableScanOperator(MapWork work,
         TableScanOperator tso) {
 
       for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
@@ -211,7 +212,7 @@ public class MetadataOnlyOptimizer imple
       return desc;
     }
 
-    private List<String> getPathsForAlias(MapredWork work, String alias) {
+    private List<String> getPathsForAlias(MapWork work, String alias) {
       List<String> paths = new ArrayList<String>();
 
       for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
@@ -223,7 +224,7 @@ public class MetadataOnlyOptimizer imple
       return paths;
     }
 
-    private void processAlias(MapredWork work, String alias) {
+    private void processAlias(MapWork work, String alias) {
       // Change the alias partition desc
       PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
       changePartitionToMetadataOnly(aliasPartn);
@@ -247,12 +248,6 @@ public class MetadataOnlyOptimizer imple
       return partSpec.toString().replaceAll("[:/#\\?]", "_");
     }
 
-    private void convertToMetadataOnlyQuery(MapredWork work,
-        TableScanOperator tso) {
-      String alias = getAliasForTableScanOperator(work, tso);
-      processAlias(work, alias);
-    }
-
     @Override
     public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
         throws SemanticException {
@@ -305,8 +300,10 @@ public class MetadataOnlyOptimizer imple
 
       while (iterator.hasNext()) {
         TableScanOperator tso = iterator.next();
-        LOG.info("Metadata only table scan for " + tso.getConf().getAlias());
-        convertToMetadataOnlyQuery((MapredWork) task.getWork(), tso);
+        MapWork work = ((MapredWork) task.getWork()).getMapWork();
+        String alias = getAliasForTableScanOperator(work, tso);
+        LOG.info("Metadata only table scan for " + alias);
+        processAlias(work, alias);
       }
 
       return null;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java Mon Jul 29 21:08:03 2013
@@ -72,11 +72,27 @@ public class PhysicalContext {
     this.context = context;
   }
 
+  public List<Task<? extends Serializable>> getRootTasks() {
+    return rootTasks;
+  }
+
+  public void setRootTasks(List<Task<? extends Serializable>> rootTasks) {
+    this.rootTasks = rootTasks;
+  }
+
+  public Task<? extends Serializable> getFetchTask() {
+    return fetchTask;
+  }
+
+  public void setFetchTask(Task<? extends Serializable> fetchTask) {
+    this.fetchTask = fetchTask;
+  }
+
   public void addToRootTask(Task<? extends Serializable> tsk){
     rootTasks.add(tsk);
   }
+
   public void removeFromRootTask(Task<? extends Serializable> tsk){
     rootTasks.remove(tsk);
   }
-
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Mon Jul 29 21:08:03 2013
@@ -67,6 +67,9 @@ public class PhysicalOptimizer {
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) {
       resolvers.add(new MetadataOnlyOptimizer());
     }
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY)) {
+      resolvers.add(new SamplingOptimizer());
+    }
 
     // Physical optimizers which follow this need to be careful not to invalidate the inferences
     // made by this optimizer. Only optimizers which depend on the results of this one should

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Mon Jul 29 21:08:03 2013
@@ -51,7 +51,7 @@ public class SkewJoinResolver implements
     Dispatcher disp = new SkewJoinTaskDispatcher(pctx);
     GraphWalker ogw = new DefaultGraphWalker(disp);
     ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pctx.rootTasks);
+    topNodes.addAll(pctx.getRootTasks());
     ogw.startWalking(topNodes, null);
     return pctx;
   }
@@ -74,7 +74,7 @@ public class SkewJoinResolver implements
       Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
 
       if (!task.isMapRedTask() || task instanceof ConditionalTask
-          || ((MapredWork) task.getWork()).getReducer() == null) {
+          || ((MapredWork) task.getWork()).getReduceWork() == null) {
         return null;
       }
 
@@ -94,7 +94,9 @@ public class SkewJoinResolver implements
 
       // iterator the reducer operator tree
       ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.add(((MapredWork) task.getWork()).getReducer());
+      if (((MapredWork)task.getWork()).getReduceWork() != null) {
+        topNodes.add(((MapredWork) task.getWork()).getReduceWork().getReducer());
+      }
       ogw.startWalking(topNodes, null);
       return null;
     }