You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [15/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Jan 21 10:37:58 2010
@@ -18,87 +18,99 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.Iterator;
-import java.util.List;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.io.Serializable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.plan.fetchWork;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.tableScanDesc;
-import org.apache.hadoop.hive.ql.plan.filterDesc.sampleDesc;
-import org.apache.hadoop.hive.ql.metadata.*;
-import org.apache.hadoop.hive.ql.parse.*;
-import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.ql.plan.filterDesc.sampleDesc;
 
 /**
- * General utility common functions for the Processor to convert operator into map-reduce tasks
+ * General utility common functions for the Processor to convert operator into
+ * map-reduce tasks
  */
 public class GenMapRedUtils {
   private static Log LOG;
 
   static {
-    LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
+    LOG = LogFactory
+        .getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
   }
 
   /**
    * Initialize the current plan by adding it to root tasks
-   * @param op the reduce sink operator encountered
-   * @param opProcCtx processing context
+   * 
+   * @param op
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          processing context
    */
-  public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) throws SemanticException {
+  public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
+      throws SemanticException {
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+        .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork plan = (mapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+        .getOpTaskMap();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
 
     opTaskMap.put(reducer, currTask);
     plan.setReducer(reducer);
-    reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+    reduceSinkDesc desc = op.getConf();
 
     plan.setNumReduceTasks(desc.getNumReducers());
 
     List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
 
     rootTasks.add(currTask);
-    if (reducer.getClass() == JoinOperator.class)
+    if (reducer.getClass() == JoinOperator.class) {
       plan.setNeedsTagging(true);
+    }
 
     assert currTopOp != null;
     List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
@@ -117,39 +129,51 @@
 
   /**
    * Initialize the current plan by adding it to root tasks
-   * @param op the map join operator encountered
-   * @param opProcCtx processing context
-   * @param pos position of the parent
-   */
-  public static void initMapJoinPlan(Operator<? extends Serializable> op, GenMRProcContext opProcCtx, boolean readInputMapJoin, boolean readInputUnion,
-      boolean setReducer, int pos)
-    throws SemanticException {
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+   * 
+   * @param op
+   *          the map join operator encountered
+   * @param opProcCtx
+   *          processing context
+   * @param pos
+   *          position of the parent
+   */
+  public static void initMapJoinPlan(Operator<? extends Serializable> op,
+      GenMRProcContext opProcCtx, boolean readInputMapJoin,
+      boolean readInputUnion, boolean setReducer, int pos)
+      throws SemanticException {
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+        .getMapCurrCtx();
     assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
     int parentPos = (pos == -1) ? 0 : pos;
-    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(parentPos));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
+        parentPos));
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork plan = (mapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+        .getOpTaskMap();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
 
-    // The mapjoin has already been encountered. Some context must be stored about that
+    // The mapjoin has already been encountered. Some context must be stored
+    // about that
     if (readInputMapJoin) {
       MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp();
       assert currMapJoinOp != null;
-      boolean local = ((pos == -1) || (pos == ((mapJoinDesc)currMapJoinOp.getConf()).getPosBigTable())) ? false : true;
+      boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf())
+          .getPosBigTable())) ? false : true;
 
       if (setReducer) {
-        Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+        Operator<? extends Serializable> reducer = op.getChildOperators()
+            .get(0);
         plan.setReducer(reducer);
         opTaskMap.put(reducer, currTask);
-        if (reducer.getClass() == JoinOperator.class)
+        if (reducer.getClass() == JoinOperator.class) {
           plan.setNeedsTagging(true);
-        reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+        }
+        reduceSinkDesc desc = (reduceSinkDesc) op.getConf();
         plan.setNumReduceTasks(desc.getNumReducers());
-      }
-      else
+      } else {
         opTaskMap.put(op, currTask);
+      }
 
       if (!readInputUnion) {
         GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
@@ -161,24 +185,22 @@
           taskTmpDir = mjCtx.getTaskTmpDir();
           tt_desc = mjCtx.getTTDesc();
           rootOp = mjCtx.getRootMapJoinOp();
-        }
-        else {
-          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx.getOldMapJoin());
+        } else {
+          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx
+              .getOldMapJoin());
           taskTmpDir = oldMjCtx.getTaskTmpDir();
           tt_desc = oldMjCtx.getTTDesc();
           rootOp = oldMjCtx.getRootMapJoinOp();
         }
 
         setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
-      }
-      else {
+      } else {
         initUnionPlan(opProcCtx, currTask, false);
       }
 
       opProcCtx.setCurrMapJoinOp(null);
-    }
-    else {
-      mapJoinDesc desc = (mapJoinDesc)op.getConf();
+    } else {
+      mapJoinDesc desc = (mapJoinDesc) op.getConf();
 
       // The map is overloaded to keep track of mapjoins also
       opTaskMap.put(op, currTask);
@@ -202,43 +224,50 @@
 
   /**
    * Initialize the current union plan.
-   *
-   * @param op the reduce sink operator encountered
-   * @param opProcCtx processing context
+   * 
+   * @param op
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          processing context
    */
-  public static void initUnionPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) throws SemanticException {
+  public static void initUnionPlan(ReduceSinkOperator op,
+      GenMRProcContext opProcCtx) throws SemanticException {
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+        .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork plan = (mapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+        .getOpTaskMap();
 
     opTaskMap.put(reducer, currTask);
     plan.setReducer(reducer);
-    reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+    reduceSinkDesc desc = op.getConf();
 
     plan.setNumReduceTasks(desc.getNumReducers());
 
-    if (reducer.getClass() == JoinOperator.class)
+    if (reducer.getClass() == JoinOperator.class) {
       plan.setNeedsTagging(true);
+    }
 
     initUnionPlan(opProcCtx, currTask, false);
   }
 
   /*
-   * It is a idempotent function to add various intermediate files as the source for the
-   * union. The plan has already been created.
+   * It is a idempotent function to add various intermediate files as the source
+   * for the union. The plan has already been created.
    */
-  public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask, boolean local) {
+  public static void initUnionPlan(GenMRProcContext opProcCtx,
+      Task<? extends Serializable> currTask, boolean local) {
     mapredWork plan = (mapredWork) currTask.getWork();
     UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
     assert currUnionOp != null;
     GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
     assert uCtx != null;
 
-    List<String>    taskTmpDirLst = uCtx.getTaskTmpDir();
-    List<tableDesc> tt_descLst    = uCtx.getTTDesc();
+    List<String> taskTmpDirLst = uCtx.getTaskTmpDir();
+    List<tableDesc> tt_descLst = uCtx.getTTDesc();
     assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty();
     assert taskTmpDirLst.size() == tt_descLst.size();
     int size = taskTmpDirLst.size();
@@ -250,7 +279,8 @@
       if (plan.getPathToAliases().get(taskTmpDir) == null) {
         plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
         plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-        plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+        plan.getPathToPartitionInfo().put(taskTmpDir,
+            new partitionDesc(tt_desc, null));
         plan.getAliasToWork().put(taskTmpDir, currUnionOp);
       }
     }
@@ -258,19 +288,22 @@
 
   /**
    * Merge the current task with the task for the current reducer
-   * @param op operator being processed
-   * @param oldTask the old task for the current reducer
-   * @param task the current task for the current reducer
-   * @param opProcCtx processing context
-   * @param pos position of the parent in the stack
+   * 
+   * @param op
+   *          operator being processed
+   * @param oldTask
+   *          the old task for the current reducer
+   * @param task
+   *          the current task for the current reducer
+   * @param opProcCtx
+   *          processing context
+   * @param pos
+   *          position of the parent in the stack
    */
   public static void joinPlan(Operator<? extends Serializable> op,
-                              Task<? extends Serializable> oldTask,
-                              Task<? extends Serializable> task,
-                              GenMRProcContext opProcCtx,
-                              int pos, boolean split,
-                              boolean readMapJoinData,
-                              boolean readUnionData) throws SemanticException {
+      Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
+      GenMRProcContext opProcCtx, int pos, boolean split,
+      boolean readMapJoinData, boolean readUnionData) throws SemanticException {
     Task<? extends Serializable> currTask = task;
     mapredWork plan = (mapredWork) currTask.getWork();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
@@ -279,72 +312,76 @@
     // terminate the old task and make current task dependent on it
     if (split) {
       assert oldTask != null;
-      splitTasks((ReduceSinkOperator)op, oldTask, currTask, opProcCtx, true, false, 0);
-    }
-    else {
-      if ((oldTask != null) && (oldTask.getParentTasks() != null) && !oldTask.getParentTasks().isEmpty()) {
+      splitTasks(op, oldTask, currTask, opProcCtx, true, false, 0);
+    } else {
+      if ((oldTask != null) && (oldTask.getParentTasks() != null)
+          && !oldTask.getParentTasks().isEmpty()) {
         parTasks = new ArrayList<Task<? extends Serializable>>();
         parTasks.addAll(oldTask.getParentTasks());
 
         Object[] parTaskArr = parTasks.toArray();
-        for (int i = 0; i < parTaskArr.length; i++)
-          ((Task<? extends Serializable>)parTaskArr[i]).removeDependentTask(oldTask);
+        for (Object element : parTaskArr) {
+          ((Task<? extends Serializable>) element).removeDependentTask(oldTask);
+        }
       }
     }
 
     if (currTopOp != null) {
       List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
-      String                                 currAliasId = opProcCtx.getCurrAliasId();
+      String currAliasId = opProcCtx.getCurrAliasId();
 
       if (!seenOps.contains(currTopOp)) {
         seenOps.add(currTopOp);
         boolean local = false;
-        if (pos != -1)
-          local = (pos == ((mapJoinDesc)op.getConf()).getPosBigTable()) ? false : true;
+        if (pos != -1) {
+          local = (pos == ((mapJoinDesc) op.getConf()).getPosBigTable()) ? false
+              : true;
+        }
         setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
       }
       currTopOp = null;
       opProcCtx.setCurrTopOp(currTopOp);
-    }
-    else if (opProcCtx.getCurrMapJoinOp() != null) {
-      MapJoinOperator mjOp  = opProcCtx.getCurrMapJoinOp();
+    } else if (opProcCtx.getCurrMapJoinOp() != null) {
+      MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp();
       if (readUnionData) {
         initUnionPlan(opProcCtx, currTask, false);
-      }
-      else {
+      } else {
         GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
 
-        // In case of map-join followed by map-join, the file needs to be obtained from the old map join
+        // In case of map-join followed by map-join, the file needs to be
+        // obtained from the old map join
         MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin();
-        String          taskTmpDir = null;
-        tableDesc       tt_desc    = null;
+        String taskTmpDir = null;
+        tableDesc tt_desc = null;
         Operator<? extends Serializable> rootOp = null;
 
         if (oldMapJoin == null) {
           taskTmpDir = mjCtx.getTaskTmpDir();
-          tt_desc    = mjCtx.getTTDesc();
-          rootOp     = mjCtx.getRootMapJoinOp();
-        }
-        else {
+          tt_desc = mjCtx.getTTDesc();
+          rootOp = mjCtx.getRootMapJoinOp();
+        } else {
           GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(oldMapJoin);
           assert oldMjCtx != null;
           taskTmpDir = oldMjCtx.getTaskTmpDir();
-          tt_desc    = oldMjCtx.getTTDesc();
-          rootOp     = oldMjCtx.getRootMapJoinOp();
+          tt_desc = oldMjCtx.getTTDesc();
+          rootOp = oldMjCtx.getRootMapJoinOp();
         }
 
-        boolean local = ((pos == -1) || (pos == ((mapJoinDesc)mjOp.getConf()).getPosBigTable())) ? false : true;
+        boolean local = ((pos == -1) || (pos == (mjOp.getConf())
+            .getPosBigTable())) ? false : true;
         setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
       }
       opProcCtx.setCurrMapJoinOp(null);
 
       if ((oldTask != null) && (parTasks != null)) {
-        for (Task<? extends Serializable> parTask : parTasks)
+        for (Task<? extends Serializable> parTask : parTasks) {
           parTask.addDependentTask(currTask);
+        }
       }
 
-      if (opProcCtx.getRootTasks().contains(currTask))
+      if (opProcCtx.getRootTasks().contains(currTask)) {
         opProcCtx.getRootTasks().remove(currTask);
+      }
     }
 
     opProcCtx.setCurrTask(currTask);
@@ -352,26 +389,31 @@
 
   /**
    * Split the current plan by creating a temporary destination
-   * @param op the reduce sink operator encountered
-   * @param opProcCtx processing context
+   * 
+   * @param op
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          processing context
    */
   public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
-    throws SemanticException {
+      throws SemanticException {
     // Generate a new task
     mapredWork cplan = getMapRedWork();
     ParseContext parseCtx = opProcCtx.getParseCtx();
-    Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx.getConf());
+    Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+        .getConf());
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
 
     // Add the reducer
     cplan.setReducer(reducer);
-    reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+    reduceSinkDesc desc = op.getConf();
 
     cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
 
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+        .getOpTaskMap();
     opTaskMap.put(reducer, redTask);
-    Task<? extends Serializable> currTask    = opProcCtx.getCurrTask();
+    Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
 
     splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
     opProcCtx.getRootOps().add(op);
@@ -379,30 +421,36 @@
 
   /**
    * set the current task in the mapredWork
-   * @param alias_id current alias
-   * @param topOp    the top operator of the stack
-   * @param plan     current plan
-   * @param local    whether you need to add to map-reduce or local work
-   * @param opProcCtx processing context
-   */
-  public static void setTaskPlan(String alias_id, Operator<? extends Serializable> topOp,
-      mapredWork plan, boolean local, GenMRProcContext opProcCtx)
-    throws SemanticException {
+   * 
+   * @param alias_id
+   *          current alias
+   * @param topOp
+   *          the top operator of the stack
+   * @param plan
+   *          current plan
+   * @param local
+   *          whether you need to add to map-reduce or local work
+   * @param opProcCtx
+   *          processing context
+   */
+  public static void setTaskPlan(String alias_id,
+      Operator<? extends Serializable> topOp, mapredWork plan, boolean local,
+      GenMRProcContext opProcCtx) throws SemanticException {
     ParseContext parseCtx = opProcCtx.getParseCtx();
     Set<ReadEntity> inputs = opProcCtx.getInputs();
 
     ArrayList<Path> partDir = new ArrayList<Path>();
     ArrayList<partitionDesc> partDesc = new ArrayList<partitionDesc>();
 
-    Path       tblDir  = null;
-    tableDesc  tblDesc = null;
+    Path tblDir = null;
+    tableDesc tblDesc = null;
 
     PrunedPartitionList partsList = null;
 
     try {
       partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
-                                        parseCtx.getOpToPartPruner().get(topOp),
-                                        opProcCtx.getConf(), alias_id, parseCtx.getPrunedPartitions());
+          parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(),
+          alias_id, parseCtx.getPrunedPartitions());
     } catch (SemanticException e) {
       throw e;
     } catch (HiveException e) {
@@ -412,35 +460,40 @@
 
     // Generate the map work for this alias_id
     Set<Partition> parts = null;
-    // pass both confirmed and unknown partitions through the map-reduce framework
+    // pass both confirmed and unknown partitions through the map-reduce
+    // framework
 
     parts = partsList.getConfirmedPartns();
     parts.addAll(partsList.getUnknownPartns());
     partitionDesc aliasPartnDesc = null;
-    try{
-    	if (parts.isEmpty()) {
-  			if (!partsList.getDeniedPartns().isEmpty())
-  				aliasPartnDesc = Utilities.getPartitionDesc(partsList.getDeniedPartns()
-  				    .iterator().next());
-  		} else {
-  			aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
-  		}
+    try {
+      if (parts.isEmpty()) {
+        if (!partsList.getDeniedPartns().isEmpty()) {
+          aliasPartnDesc = Utilities.getPartitionDesc(partsList
+              .getDeniedPartns().iterator().next());
+        }
+      } else {
+        aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
+      }
     } catch (HiveException e) {
-    	LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
       throw new SemanticException(e.getMessage(), e);
     }
 
     // The table does not have any partitions
-    if (aliasPartnDesc == null)
-      aliasPartnDesc = new partitionDesc(Utilities.getTableDesc(parseCtx.getTopToTable().get(topOp)), null);
+    if (aliasPartnDesc == null) {
+      aliasPartnDesc = new partitionDesc(Utilities.getTableDesc(parseCtx
+          .getTopToTable().get(topOp)), null);
+    }
 
     plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
 
     for (Partition part : parts) {
-      if (part.getTable().isPartitioned())
+      if (part.getTable().isPartitioned()) {
         inputs.add(new ReadEntity(part));
-      else
+      } else {
         inputs.add(new ReadEntity(part.getTable()));
+      }
 
       // Later the properties have to come from the partition as opposed
       // to from the table in order to support versioning.
@@ -449,8 +502,7 @@
 
       if (sampleDescr != null) {
         paths = SamplePruner.prune(part, sampleDescr);
-      }
-      else {
+      } else {
         paths = part.getPath();
       }
 
@@ -462,23 +514,24 @@
         tblDesc = Utilities.getTableDesc(part.getTable());
       }
 
-      for (Path p: paths) {
-        if(p == null)
+      for (Path p : paths) {
+        if (p == null) {
           continue;
+        }
         String path = p.toString();
         LOG.debug("Adding " + path + " of table" + alias_id);
 
         partDir.add(p);
-        try{
-        	partDesc.add(Utilities.getPartitionDesc(part));
+        try {
+          partDesc.add(Utilities.getPartitionDesc(part));
         } catch (HiveException e) {
-        	LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+          LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
           throw new SemanticException(e.getMessage(), e);
         }
       }
     }
 
-    Iterator<Path>          iterPath      = partDir.iterator();
+    Iterator<Path> iterPath = partDir.iterator();
     Iterator<partitionDesc> iterPartnDesc = partDesc.iterator();
 
     if (!local) {
@@ -499,53 +552,65 @@
 
       assert plan.getAliasToWork().get(alias_id) == null;
       plan.getAliasToWork().put(alias_id, topOp);
-    }
-    else {
+    } else {
       // populate local work if needed
       mapredLocalWork localPlan = plan.getMapLocalWork();
-      if (localPlan == null)
+      if (localPlan == null) {
         localPlan = new mapredLocalWork(
             new LinkedHashMap<String, Operator<? extends Serializable>>(),
             new LinkedHashMap<String, fetchWork>());
+      }
 
       assert localPlan.getAliasToWork().get(alias_id) == null;
       assert localPlan.getAliasToFetchWork().get(alias_id) == null;
       localPlan.getAliasToWork().put(alias_id, topOp);
-      if (tblDir == null)
-        localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(fetchWork.convertPathToStringArray(partDir), partDesc));
-      else
-        localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(tblDir.toString(), tblDesc));
+      if (tblDir == null) {
+        localPlan.getAliasToFetchWork()
+            .put(
+                alias_id,
+                new fetchWork(fetchWork.convertPathToStringArray(partDir),
+                    partDesc));
+      } else {
+        localPlan.getAliasToFetchWork().put(alias_id,
+            new fetchWork(tblDir.toString(), tblDesc));
+      }
       plan.setMapLocalWork(localPlan);
     }
   }
 
-
   /**
    * set the current task in the mapredWork
-   * @param alias    current alias
-   * @param topOp    the top operator of the stack
-   * @param plan     current plan
-   * @param local    whether you need to add to map-reduce or local work
-   * @param tt_desc  table descriptor
-   */
-  public static void setTaskPlan(String path, String alias, Operator<? extends Serializable> topOp,
-                                 mapredWork plan, boolean local, tableDesc tt_desc)
-    throws SemanticException {
+   * 
+   * @param alias
+   *          current alias
+   * @param topOp
+   *          the top operator of the stack
+   * @param plan
+   *          current plan
+   * @param local
+   *          whether you need to add to map-reduce or local work
+   * @param tt_desc
+   *          table descriptor
+   */
+  public static void setTaskPlan(String path, String alias,
+      Operator<? extends Serializable> topOp, mapredWork plan, boolean local,
+      tableDesc tt_desc) throws SemanticException {
 
     if (!local) {
-      if (plan.getPathToAliases().get(path) == null)
+      if (plan.getPathToAliases().get(path) == null) {
         plan.getPathToAliases().put(path, new ArrayList<String>());
+      }
       plan.getPathToAliases().get(path).add(alias);
       plan.getPathToPartitionInfo().put(path, new partitionDesc(tt_desc, null));
       plan.getAliasToWork().put(alias, topOp);
-    }
-    else {
+    } else {
       // populate local work if needed
       mapredLocalWork localPlan = plan.getMapLocalWork();
-      if (localPlan == null)
+      if (localPlan == null) {
         localPlan = new mapredLocalWork(
-                                        new LinkedHashMap<String, Operator<? extends Serializable>>(),
-                                        new LinkedHashMap<String, fetchWork>());
+            new LinkedHashMap<String, Operator<? extends Serializable>>(),
+            new LinkedHashMap<String, fetchWork>());
+      }
 
       assert localPlan.getAliasToWork().get(alias) == null;
       assert localPlan.getAliasToFetchWork().get(alias) == null;
@@ -557,15 +622,20 @@
 
   /**
    * set key and value descriptor
-   * @param plan     current plan
-   * @param topOp    current top operator in the path
-   */
-  public static void setKeyAndValueDesc(mapredWork plan, Operator<? extends Serializable> topOp) {
-    if (topOp == null)
+   * 
+   * @param plan
+   *          current plan
+   * @param topOp
+   *          current top operator in the path
+   */
+  public static void setKeyAndValueDesc(mapredWork plan,
+      Operator<? extends Serializable> topOp) {
+    if (topOp == null) {
       return;
+    }
 
     if (topOp instanceof ReduceSinkOperator) {
-      ReduceSinkOperator rs = (ReduceSinkOperator)topOp;
+      ReduceSinkOperator rs = (ReduceSinkOperator) topOp;
       plan.setKeyDesc(rs.getConf().getKeySerializeInfo());
       int tag = Math.max(0, rs.getConf().getTag());
       List<tableDesc> tagToSchema = plan.getTagToValueDesc();
@@ -574,9 +644,10 @@
       }
       tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
     } else {
-      List<Operator<? extends Serializable>> children = topOp.getChildOperators();
+      List<Operator<? extends Serializable>> children = topOp
+          .getChildOperators();
       if (children != null) {
-        for(Operator<? extends Serializable> op: children) {
+        for (Operator<? extends Serializable> op : children) {
           setKeyAndValueDesc(plan, op);
         }
       }
@@ -585,13 +656,15 @@
 
   /**
    * create a new plan and return
+   * 
    * @return the new plan
    */
   public static mapredWork getMapRedWork() {
     mapredWork work = new mapredWork();
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, partitionDesc>());
-    work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+    work
+        .setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
     work.setTagToValueDesc(new ArrayList<tableDesc>());
     work.setReducer(null);
     return work;
@@ -599,13 +672,17 @@
 
   /**
    * insert in the map for the operator to row resolver
-   * @param op operator created
-   * @param rr row resolver
-   * @param parseCtx parse context
+   * 
+   * @param op
+   *          operator created
+   * @param rr
+   *          row resolver
+   * @param parseCtx
+   *          parse context
    */
   @SuppressWarnings("nls")
-  private static Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx)
-  {
+  private static Operator<? extends Serializable> putOpInsertMap(
+      Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx) {
     OpParseContext ctx = new OpParseContext(rr);
     parseCtx.getOpParseCtx().put(op, ctx);
     return op;
@@ -622,40 +699,47 @@
    * @param pos position of the parent
    **/
   public static void splitTasks(Operator<? extends Serializable> op,
-                                 Task<? extends Serializable> parentTask,
-                                 Task<? extends Serializable> childTask,
-                                 GenMRProcContext opProcCtx, boolean setReducer,
-                                 boolean local, int posn) throws SemanticException {
-    mapredWork plan = (mapredWork) childTask.getWork();
+      Task<? extends Serializable> parentTask,
+      Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
+      boolean setReducer, boolean local, int posn) throws SemanticException {
+    childTask.getWork();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
 
     ParseContext parseCtx = opProcCtx.getParseCtx();
     parentTask.addDependentTask(childTask);
 
-    // Root Task cannot depend on any other task, therefore childTask cannot be a root Task
+    // Root Task cannot depend on any other task, therefore childTask cannot be
+    // a root Task
     List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-    if (rootTasks.contains(childTask))
+    if (rootTasks.contains(childTask)) {
       rootTasks.remove(childTask);
+    }
 
     // generate the temporary file
     Context baseCtx = parseCtx.getContext();
     String taskTmpDir = baseCtx.getMRTmpFileURI();
 
     Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
-    tableDesc tt_desc =
-      PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+    tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+        .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
 
     // Create a file sink operator for this file name
-    boolean compressIntermediate = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
-    fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+    boolean compressIntermediate = parseCtx.getConf().getBoolVar(
+        HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+    fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc,
+        compressIntermediate);
     if (compressIntermediate) {
-      desc.setCompressCodec(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
-      desc.setCompressType(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+      desc.setCompressCodec(parseCtx.getConf().getVar(
+          HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+      desc.setCompressType(parseCtx.getConf().getVar(
+          HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
     }
-    Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory.get(desc, parent.getSchema()), null, parseCtx);
+    Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
+        .get(desc, parent.getSchema()), null, parseCtx);
 
     // replace the reduce child with this operator
-    List<Operator<? extends Serializable>> childOpList = parent.getChildOperators();
+    List<Operator<? extends Serializable>> childOpList = parent
+        .getChildOperators();
     for (int pos = 0; pos < childOpList.size(); pos++) {
       if (childOpList.get(pos) == op) {
         childOpList.set(pos, fs_op);
@@ -668,15 +752,16 @@
     fs_op.setParentOperators(parentOpList);
 
     // create a dummy tableScan operator on top of op
-    Operator<? extends Serializable> ts_op =
-      putOpInsertMap(OperatorFactory.get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
+    Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
+        .get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
 
     childOpList = new ArrayList<Operator<? extends Serializable>>();
     childOpList.add(op);
     ts_op.setChildOperators(childOpList);
     op.getParentOperators().set(posn, ts_op);
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+        .getMapCurrCtx();
     mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
 
     String streamDesc = taskTmpDir;
@@ -690,14 +775,16 @@
         streamDesc = "$INTNAME";
         origStreamDesc = streamDesc;
         int pos = 0;
-        while (cplan.getAliasToWork().get(streamDesc) != null)
+        while (cplan.getAliasToWork().get(streamDesc) != null) {
           streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+        }
       }
 
       // TODO: Allocate work to remove the temporary files and make that
       // dependent on the redTask
-      if (reducer.getClass() == JoinOperator.class)
+      if (reducer.getClass() == JoinOperator.class) {
         cplan.setNeedsTagging(true);
+      }
     }
 
     // Add the path to alias mapping
@@ -705,18 +792,19 @@
 
     // This can be cleaned up as a function table in future
     if (op instanceof MapJoinOperator) {
-      MapJoinOperator mjOp = (MapJoinOperator)op;
+      MapJoinOperator mjOp = (MapJoinOperator) op;
       opProcCtx.setCurrMapJoinOp(mjOp);
       GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
-      if (mjCtx == null)
+      if (mjCtx == null) {
         mjCtx = new GenMRMapJoinCtx(taskTmpDir, tt_desc, ts_op, null);
-      else {
+      } else {
         mjCtx.setTaskTmpDir(taskTmpDir);
         mjCtx.setTTDesc(tt_desc);
         mjCtx.setRootMapJoinOp(ts_op);
       }
       opProcCtx.setMapJoinCtx(mjOp, mjCtx);
-      opProcCtx.getMapCurrCtx().put(parent, new GenMapRedCtx(childTask, null, null));
+      opProcCtx.getMapCurrCtx().put(parent,
+          new GenMapRedCtx(childTask, null, null));
     }
 
     currTopOp = null;
@@ -727,7 +815,8 @@
     opProcCtx.setCurrTask(childTask);
   }
 
-  static public void mergeMapJoinUnion(UnionOperator union, GenMRProcContext ctx, int pos) throws SemanticException {
+  static public void mergeMapJoinUnion(UnionOperator union,
+      GenMRProcContext ctx, int pos) throws SemanticException {
     ParseContext parseCtx = ctx.getParseCtx();
     UnionProcContext uCtx = parseCtx.getUCtx();
 
@@ -739,7 +828,7 @@
     GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
     Task<? extends Serializable> uTask = null;
 
-    Operator<? extends Serializable> parent = union.getParentOperators().get(pos);
+    union.getParentOperators().get(pos);
     mapredWork uPlan = null;
 
     // union is encountered for the first time
@@ -749,10 +838,9 @@
       uTask = TaskFactory.get(uPlan, parseCtx.getConf());
       uCtxTask.setUTask(uTask);
       ctx.setUnionTask(union, uCtxTask);
-    }
-    else {
+    } else {
       uTask = uCtxTask.getUTask();
-      uPlan = (mapredWork)uTask.getWork();
+      uPlan = (mapredWork) uTask.getWork();
     }
 
     // If there is a mapjoin at position 'pos'
@@ -762,30 +850,34 @@
       if (uPlan.getPathToAliases().get(taskTmpDir) == null) {
         uPlan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
         uPlan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-        uPlan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(mjCtx.getTTDesc(), null));
+        uPlan.getPathToPartitionInfo().put(taskTmpDir,
+            new partitionDesc(mjCtx.getTTDesc(), null));
         uPlan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
       }
 
-      for (Task t : currTask.getParentTasks())
+      for (Task t : currTask.getParentTasks()) {
         t.addDependentTask(uTask);
+      }
       try {
         boolean notDone = true;
         while (notDone) {
-          for (Task t : currTask.getParentTasks())
+          for (Task t : currTask.getParentTasks()) {
             t.removeDependentTask(currTask);
+          }
           notDone = false;
         }
       } catch (java.util.ConcurrentModificationException e) {
       }
-    }
-    else
+    } else {
       setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(), uPlan, false, ctx);
+    }
 
     ctx.setCurrTask(uTask);
     ctx.setCurrAliasId(null);
     ctx.setCurrTopOp(null);
     ctx.setCurrMapJoinOp(null);
 
-    ctx.getMapCurrCtx().put((Operator<? extends Serializable>)union, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+    ctx.getMapCurrCtx().put(union,
+        new GenMapRedCtx(ctx.getCurrTask(), null, null));
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Thu Jan 21 10:37:58 2010
@@ -130,13 +130,14 @@
         throws SemanticException {
 
       // if this is not a HASH groupby, return
-      if (curr.getConf().getMode() != groupByDesc.Mode.HASH)
+      if (curr.getConf().getMode() != groupByDesc.Mode.HASH) {
         return;
+      }
 
-      Set<String> tblNames = this.pGraphContext.getGroupOpToInputTables().get(
-          curr);
-      if (tblNames == null || tblNames.size() == 0)
+      Set<String> tblNames = pGraphContext.getGroupOpToInputTables().get(curr);
+      if (tblNames == null || tblNames.size() == 0) {
         return;
+      }
 
       boolean bucketGroupBy = true;
       groupByDesc desc = curr.getConf();
@@ -144,7 +145,7 @@
       groupByKeys.addAll(desc.getKeys());
       // compute groupby columns from groupby keys
       List<String> groupByCols = new ArrayList<String>();
-      while (groupByKeys.size() >0) {
+      while (groupByKeys.size() > 0) {
         exprNodeDesc node = groupByKeys.remove(0);
         if (node instanceof exprNodeColumnDesc) {
           groupByCols.addAll(node.getCols());
@@ -155,22 +156,24 @@
           groupByKeys.add(0, ((exprNodeFieldDesc) node).getDesc());
           continue;
         } else if (node instanceof exprNodeGenericFuncDesc) {
-          exprNodeGenericFuncDesc udfNode = ((exprNodeGenericFuncDesc)node);
+          exprNodeGenericFuncDesc udfNode = ((exprNodeGenericFuncDesc) node);
           GenericUDF udf = udfNode.getGenericUDF();
-          if(!FunctionRegistry.isDeterministic(udf))
+          if (!FunctionRegistry.isDeterministic(udf)) {
             return;
+          }
           groupByKeys.addAll(0, udfNode.getChildExprs());
         } else {
           return;
         }
       }
-      
-      if(groupByCols.size() == 0)
+
+      if (groupByCols.size() == 0) {
         return;
+      }
 
       for (String table : tblNames) {
-        Operator<? extends Serializable> topOp = this.pGraphContext.getTopOps()
-            .get(table);
+        Operator<? extends Serializable> topOp = pGraphContext.getTopOps().get(
+            table);
         if (topOp == null || (!(topOp instanceof TableScanOperator))) {
           // this is in a sub-query.
           // In future, we need to infer subq's columns propery. For example
@@ -180,21 +183,25 @@
           return;
         }
         TableScanOperator ts = (TableScanOperator) topOp;
-        Table destTable = this.pGraphContext.getTopToTable().get(ts);
-        if (destTable == null)
+        Table destTable = pGraphContext.getTopToTable().get(ts);
+        if (destTable == null) {
           return;
+        }
         if (!destTable.isPartitioned()) {
           List<String> bucketCols = destTable.getBucketCols();
-          List<String> sortCols = Utilities.getColumnNamesFromSortCols(destTable.getSortCols());
-          bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols, sortCols);
-          if (!bucketGroupBy)
+          List<String> sortCols = Utilities
+              .getColumnNamesFromSortCols(destTable.getSortCols());
+          bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
+              sortCols);
+          if (!bucketGroupBy) {
             return;
+          }
         } else {
           PrunedPartitionList partsList = null;
           try {
-            partsList = PartitionPruner.prune(destTable, this.pGraphContext
-                .getOpToPartPruner().get(ts), this.pGraphContext.getConf(),
-                table, this.pGraphContext.getPrunedPartitions());
+            partsList = PartitionPruner.prune(destTable, pGraphContext
+                .getOpToPartPruner().get(ts), pGraphContext.getConf(), table,
+                pGraphContext.getPrunedPartitions());
           } catch (HiveException e) {
             // Has to use full name to make sure it does not conflict with
             // org.apache.commons.lang.StringUtils
@@ -206,10 +213,13 @@
           parts.addAll(partsList.getUnknownPartns());
           for (Partition part : parts) {
             List<String> bucketCols = part.getBucketCols();
-            List<String> sortCols = Utilities.getColumnNamesFromSortCols(part.getTPartition().getSd().getSortCols());
-            bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols, sortCols);
-            if (!bucketGroupBy)
+            List<String> sortCols = Utilities.getColumnNamesFromSortCols(part
+                .getTPartition().getSd().getSortCols());
+            bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
+                sortCols);
+            if (!bucketGroupBy) {
               return;
+            }
           }
         }
       }
@@ -235,26 +245,29 @@
      * @throws SemanticException
      */
     private boolean matchBucketOrSortedColumns(List<String> groupByCols,
-        List<String> bucketCols, List<String> sortCols) throws SemanticException {
+        List<String> bucketCols, List<String> sortCols)
+        throws SemanticException {
       boolean ret = false;
-      
+
       if (sortCols == null || sortCols.size() == 0) {
         ret = matchBucketColumns(groupByCols, bucketCols);
       }
-      
+
       if (!ret && sortCols != null && sortCols.size() >= groupByCols.size()) {
         // check sort columns, if groupByCols is a prefix subset of sort
         // columns, we will use sorted group by. For example, if data is sorted
         // by column a, b, c, and a query wants to group by b,a, we will use
-        // sorted group by. But if the query wants to groupby b,c, then sorted group by can not be used.
+        // sorted group by. But if the query wants to groupby b,c, then sorted
+        // group by can not be used.
         int num = groupByCols.size();
-        for(int i =0;i<num; i++){
-          if(sortCols.indexOf(groupByCols.get(i)) > (num -1))
+        for (int i = 0; i < num; i++) {
+          if (sortCols.indexOf(groupByCols.get(i)) > (num - 1)) {
             return false;
+          }
         }
         return true;
       }
-      
+
       return ret;
     }
 
@@ -267,13 +280,15 @@
         List<String> tblBucketCols) throws SemanticException {
 
       if (tblBucketCols == null || tblBucketCols.size() == 0
-          || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size())
+          || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size()) {
         return false;
+      }
 
       for (int i = 0; i < grpCols.size(); i++) {
         String tblCol = grpCols.get(i);
-        if (!tblBucketCols.contains(tblCol))
+        if (!tblBucketCols.contains(tblCol)) {
           return false;
+        }
       }
       return true;
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Thu Jan 21 10:37:58 2010
@@ -19,15 +19,10 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -38,29 +33,29 @@
 /**
  * Implementation of rule-based join table reordering optimization. User passes
  * hints to specify which tables are to be streamed and they are moved to have
- * largest tag so that they are processed last.
- * In future, once statistics are implemented, this transformation can also be
- * done based on costs.
+ * largest tag so that they are processed last. In future, once statistics are
+ * implemented, this transformation can also be done based on costs.
  */
 public class JoinReorder implements Transform {
   /**
    * Estimate the size of the output based on the STREAMTABLE hints. To do so
-   * the whole tree is traversed. Possible sizes:
-   *   0: the operator and its subtree don't contain any big tables
-   *   1: the subtree of the operator contains a big table
-   *   2: the operator is a big table
-   *
-   * @param operator  The operator which output size is to be estimated
-   * @param bigTables Set of tables that should be streamed
+   * the whole tree is traversed. Possible sizes: 0: the operator and its
+   * subtree don't contain any big tables 1: the subtree of the operator
+   * contains a big table 2: the operator is a big table
+   * 
+   * @param operator
+   *          The operator which output size is to be estimated
+   * @param bigTables
+   *          Set of tables that should be streamed
    * @return The estimated size - 0 (no streamed tables), 1 (streamed tables in
-   * subtree) or 2 (a streamed table)
+   *         subtree) or 2 (a streamed table)
    */
   private int getOutputSize(Operator<? extends Serializable> operator,
-                            Set<String> bigTables) {
+      Set<String> bigTables) {
     // If a join operator contains a big subtree, there is a chance that its
     // output is also big, so the output size is 1 (medium)
     if (operator instanceof JoinOperator) {
-      for(Operator<? extends Serializable> o: operator.getParentOperators()) {
+      for (Operator<? extends Serializable> o : operator.getParentOperators()) {
         if (getOutputSize(o, bigTables) != 0) {
           return 1;
         }
@@ -69,7 +64,7 @@
 
     // If a table is in bigTables then its output is big (2)
     if (operator instanceof TableScanOperator) {
-      String alias = ((TableScanOperator)operator).getConf().getAlias();
+      String alias = ((TableScanOperator) operator).getConf().getAlias();
       if (bigTables.contains(alias)) {
         return 2;
       }
@@ -79,7 +74,7 @@
     // the biggest output from a parent
     int maxSize = 0;
     if (operator.getParentOperators() != null) {
-      for(Operator<? extends Serializable> o: operator.getParentOperators()) {
+      for (Operator<? extends Serializable> o : operator.getParentOperators()) {
         int current = getOutputSize(o, bigTables);
         if (current > maxSize) {
           maxSize = current;
@@ -92,14 +87,15 @@
 
   /**
    * Find all big tables from STREAMTABLE hints
-   *
-   * @param joinCtx The join context
+   * 
+   * @param joinCtx
+   *          The join context
    * @return Set of all big tables
    */
   private Set<String> getBigTables(ParseContext joinCtx) {
     Set<String> bigTables = new HashSet<String>();
 
-    for (QBJoinTree qbJoin: joinCtx.getJoinContext().values()) {
+    for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) {
       if (qbJoin.getStreamAliases() != null) {
         bigTables.addAll(qbJoin.getStreamAliases());
       }
@@ -111,20 +107,22 @@
   /**
    * Reorder the tables in a join operator appropriately (by reordering the tags
    * of the reduces sinks)
-   *
-   * @param joinOp The join operator to be processed
-   * @param bigTables Set of all big tables
+   * 
+   * @param joinOp
+   *          The join operator to be processed
+   * @param bigTables
+   *          Set of all big tables
    */
   private void reorder(JoinOperator joinOp, Set<String> bigTables) {
     int count = joinOp.getParentOperators().size();
 
     // Find the biggest reduce sink
-    int biggestPos  = count - 1;
-    int biggestSize = getOutputSize(joinOp.getParentOperators().get(biggestPos),
-                                    bigTables);
+    int biggestPos = count - 1;
+    int biggestSize = getOutputSize(
+        joinOp.getParentOperators().get(biggestPos), bigTables);
     for (int i = 0; i < count - 1; i++) {
       int currSize = getOutputSize(joinOp.getParentOperators().get(i),
-                                   bigTables);
+          bigTables);
       if (currSize > biggestSize) {
         biggestSize = currSize;
         biggestPos = i;
@@ -135,14 +133,14 @@
     if (biggestPos != (count - 1)) {
       Byte[] tagOrder = joinOp.getConf().getTagOrder();
       Byte temp = tagOrder[biggestPos];
-      tagOrder[biggestPos] = tagOrder[count-1];
-      tagOrder[count-1] = temp;
+      tagOrder[biggestPos] = tagOrder[count - 1];
+      tagOrder[count - 1] = temp;
 
       // Update tags of reduce sinks
-      ((ReduceSinkOperator)joinOp.getParentOperators().get(biggestPos))
-        .getConf().setTag(count-1);
-      ((ReduceSinkOperator)joinOp.getParentOperators().get(count-1)).getConf()
-        .setTag(biggestPos);
+      ((ReduceSinkOperator) joinOp.getParentOperators().get(biggestPos))
+          .getConf().setTag(count - 1);
+      ((ReduceSinkOperator) joinOp.getParentOperators().get(count - 1))
+          .getConf().setTag(biggestPos);
     }
   }
 
@@ -150,13 +148,14 @@
    * Transform the query tree. For each join, check which reduce sink will
    * output the biggest result (based on STREAMTABLE hints) and give it the
    * biggest tag so that it gets streamed.
-   *
-   * @param pactx current parse context
+   * 
+   * @param pactx
+   *          current parse context
    */
   public ParseContext transform(ParseContext pactx) throws SemanticException {
     Set<String> bigTables = getBigTables(pactx);
 
-    for (JoinOperator joinOp: pactx.getJoinContext().keySet()) {
+    for (JoinOperator joinOp : pactx.getJoinContext().keySet()) {
       reorder(joinOp, bigTables);
     }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Thu Jan 21 10:37:58 2010
@@ -18,36 +18,34 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Stack;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 
 /**
  * Operator factory for MapJoin processing
@@ -58,13 +56,14 @@
     int pos = 0;
     int size = stack.size();
     assert size >= 2 && stack.get(size - 1) == op;
-    Operator<? extends Serializable> parent = (Operator<? extends Serializable>)stack.get(size - 2);
+    Operator<? extends Serializable> parent = (Operator<? extends Serializable>) stack
+        .get(size - 2);
     List<Operator<? extends Serializable>> parOp = op.getParentOperators();
     pos = parOp.indexOf(parent);
-    assert pos < parOp.size(); 
+    assert pos < parOp.size();
     return pos;
   }
-  
+
   /**
    * TableScan followed by MapJoin
    */
@@ -73,43 +72,49 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator)nd;
-      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
-      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+          .getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+          pos));
+      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       mapredWork currPlan = (mapredWork) currTask.getWork();
-      Operator<? extends Serializable> currTopOp   = mapredCtx.getCurrTopOp();
+      Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
       String currAliasId = mapredCtx.getCurrAliasId();
       Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+          .getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-      
+
       ctx.setCurrTopOp(currTopOp);
       ctx.setCurrAliasId(currAliasId);
       ctx.setCurrTask(currTask);
-      
+
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
         GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, false, false, pos);
       }
-      // The current plan can be thrown away after being merged with the original plan
+      // The current plan can be thrown away after being merged with the
+      // original plan
       else {
-        GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, false);
+        GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
+            false, false);
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
-      
-      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
+          .getCurrTopOp(), ctx.getCurrAliasId()));
       return null;
     }
   }
-  
+
   /**
    * ReduceSink followed by MapJoin
    */
@@ -118,37 +123,43 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator)nd;
-      GenMRProcContext opProcCtx = (GenMRProcContext)procCtx;
-      
+      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
+
       mapredWork cplan = GenMapRedUtils.getMapRedWork();
       ParseContext parseCtx = opProcCtx.getParseCtx();
-      Task<? extends Serializable> redTask  = TaskFactory.get(cplan, parseCtx.getConf());
+      Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+          .getConf());
       Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
-      boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
-      
-      GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, local, pos);
+      boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
+          : true;
+
+      GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
+          local, pos);
 
       currTask = opProcCtx.getCurrTask();
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+          .getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
-      
+
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert cplan.getReducer() == null;
         opTaskMap.put(mapJoin, currTask);
         opProcCtx.setCurrMapJoinOp(null);
       }
-      // The current plan can be thrown away after being merged with the original plan
+      // The current plan can be thrown away after being merged with the
+      // original plan
       else {
-        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos, false, false, false);
+        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos,
+            false, false, false);
         currTask = opMapTask;
         opProcCtx.setCurrTask(currTask);
       }
-      
+
       return null;
     }
   }
@@ -159,87 +170,93 @@
   public static class MapJoin implements NodeProcessor {
 
     /**
-     * Create a task by splitting the plan below the join. The reason, we have to do so in the
-     * processing of Select and not MapJoin is due to the walker. While processing a node, it is not safe
-     * to alter its children because that will decide the course of the walk. It is perfectly fine to muck around
-     * with its parents though, since those nodes have already been visited.
+     * Create a task by splitting the plan below the join. The reason, we have
+     * to do so in the processing of Select and not MapJoin is due to the
+     * walker. While processing a node, it is not safe to alter its children
+     * because that will decide the course of the walk. It is perfectly fine to
+     * muck around with its parents though, since those nodes have already been
+     * visited.
      */
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      
-      SelectOperator  sel     = (SelectOperator)nd;
-      MapJoinOperator mapJoin = (MapJoinOperator)sel.getParentOperators().get(0);
+
+      SelectOperator sel = (SelectOperator) nd;
+      MapJoinOperator mapJoin = (MapJoinOperator) sel.getParentOperators().get(
+          0);
       assert sel.getParentOperators().size() == 1;
-      
-      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+      GenMRProcContext ctx = (GenMRProcContext) procCtx;
       ParseContext parseCtx = ctx.getParseCtx();
-      
+
       // is the mapjoin followed by a reducer
-      List<MapJoinOperator> listMapJoinOps = parseCtx.getListMapJoinOpsNoReducer();
-      
+      List<MapJoinOperator> listMapJoinOps = parseCtx
+          .getListMapJoinOpsNoReducer();
+
       if (listMapJoinOps.contains(mapJoin)) {
         ctx.setCurrAliasId(null);
         ctx.setCurrTopOp(null);
-        Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-        mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+        Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+            .getMapCurrCtx();
+        mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+            ctx.getCurrTask(), null, null));
         return null;
       }
 
       ctx.setCurrMapJoinOp(mapJoin);
-      
+
       Task<? extends Serializable> currTask = ctx.getCurrTask();
       GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
       if (mjCtx == null) {
         mjCtx = new GenMRMapJoinCtx();
         ctx.setMapJoinCtx(mapJoin, mjCtx);
       }
-      
+
       mapredWork mjPlan = GenMapRedUtils.getMapRedWork();
-      Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx.getConf());
-      
-      tableDesc tt_desc = 
-        PlanUtils.getIntermediateFileTableDesc(
-            PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol")); 
-      
+      Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx
+          .getConf());
+
+      tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+          .getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"));
+
       // generate the temporary file
       Context baseCtx = parseCtx.getContext();
       String taskTmpDir = baseCtx.getMRTmpFileURI();
-      
+
       // Add the path to alias mapping
       mjCtx.setTaskTmpDir(taskTmpDir);
       mjCtx.setTTDesc(tt_desc);
       mjCtx.setRootMapJoinOp(sel);
-      
+
       sel.setParentOperators(null);
-      
+
       // Create a file sink operator for this file name
-      Operator<? extends Serializable> fs_op =
-        OperatorFactory.get
-        (new fileSinkDesc(taskTmpDir, tt_desc,
-                          parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
-         mapJoin.getSchema());
-      
+      Operator<? extends Serializable> fs_op = OperatorFactory.get(
+          new fileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
+              HiveConf.ConfVars.COMPRESSINTERMEDIATE)), mapJoin.getSchema());
+
       assert mapJoin.getChildOperators().size() == 1;
       mapJoin.getChildOperators().set(0, fs_op);
-      
+
       List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
       parentOpList.add(mapJoin);
       fs_op.setParentOperators(parentOpList);
-      
+
       currTask.addDependentTask(mjTask);
-      
+
       ctx.setCurrTask(mjTask);
       ctx.setCurrAliasId(null);
       ctx.setCurrTopOp(null);
-      
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-      mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
-      
+
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+          .getMapCurrCtx();
+      mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+          ctx.getCurrTask(), null, null));
+
       return null;
     }
   }
-  
+
   /**
    * MapJoin followed by MapJoin
    */
@@ -248,50 +265,57 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator)nd;
-      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
-      ParseContext parseCtx = ctx.getParseCtx();
+      ctx.getParseCtx();
       MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp();
       assert oldMapJoin != null;
       GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
-      if (mjCtx != null)
+      if (mjCtx != null) {
         mjCtx.setOldMapJoin(oldMapJoin);
-      else
-        ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null, oldMapJoin));
+      } else {
+        ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null,
+            oldMapJoin));
+      }
       ctx.setCurrMapJoinOp(mapJoin);
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
-      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+          .getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+          pos));
+      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       mapredWork currPlan = (mapredWork) currTask.getWork();
-      String currAliasId = mapredCtx.getCurrAliasId();
+      mapredCtx.getCurrAliasId();
       Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+          .getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-      
+
       ctx.setCurrTask(currTask);
-      
+
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
         GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, false, false, pos);
       }
-      // The current plan can be thrown away after being merged with the original plan
+      // The current plan can be thrown away after being merged with the
+      // original plan
       else {
-        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false, true, false);
+        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false,
+            true, false);
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
-      
+
       mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), null, null));
       return null;
     }
   }
-  
+
   /**
    * Union followed by MapJoin
    */
@@ -300,36 +324,43 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+      GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       ParseContext parseCtx = ctx.getParseCtx();
       UnionProcContext uCtx = parseCtx.getUCtx();
 
       // union was map only - no special processing needed
-      if (uCtx.isMapOnlySubq())
-        return (new TableScanMapJoin()).process(nd, stack, procCtx, nodeOutputs);
-      
+      if (uCtx.isMapOnlySubq()) {
+        return (new TableScanMapJoin())
+            .process(nd, stack, procCtx, nodeOutputs);
+      }
+
       UnionOperator currUnion = ctx.getCurrUnionOp();
       assert currUnion != null;
-      GenMRUnionCtx unionCtx = ctx.getUnionTask(currUnion);
-      MapJoinOperator mapJoin = (MapJoinOperator)nd;
+      ctx.getUnionTask(currUnion);
+      MapJoinOperator mapJoin = (MapJoinOperator) nd;
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
-      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+          .getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+          pos));
+      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       mapredWork currPlan = (mapredWork) currTask.getWork();
       Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+          .getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-      
+
       // union result cannot be a map table
-      boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
-      if (local)
+      boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
+          : true;
+      if (local) {
         throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
-      
+      }
+
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
@@ -337,26 +368,32 @@
         GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, true, false, pos);
         ctx.setCurrUnionOp(null);
       }
-      // The current plan can be thrown away after being merged with the original plan
+      // The current plan can be thrown away after being merged with the
+      // original plan
       else {
-        Task<? extends Serializable> uTask = ctx.getUnionTask(ctx.getCurrUnionOp()).getUTask();
-        if (uTask.getId().equals(opMapTask.getId()))
-          GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, true);
-        else
-          GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false, false, true);
+        Task<? extends Serializable> uTask = ctx.getUnionTask(
+            ctx.getCurrUnionOp()).getUTask();
+        if (uTask.getId().equals(opMapTask.getId())) {
+          GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
+              false, true);
+        } else {
+          GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false,
+              false, true);
+        }
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
-      
-      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
+          .getCurrTopOp(), ctx.getCurrAliasId()));
       return null;
     }
   }
-  
+
   public static NodeProcessor getTableScanMapJoin() {
     return new TableScanMapJoin();
   }
-  
+
   public static NodeProcessor getUnionMapJoin() {
     return new UnionMapJoin();
   }