You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [14/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Thu Jan 21 10:37:58 2010
@@ -18,37 +18,36 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.List;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Stack;
-import java.io.Serializable;
 
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
@@ -56,14 +55,14 @@
 import org.apache.hadoop.hive.ql.plan.extractDesc;
 import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.loadFileDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.plan.moveWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.tableScanDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.conf.HiveConf;
 
 /**
  * Processor for the rule - table scan followed by reduce sink
@@ -74,178 +73,206 @@
   }
 
   /**
-   * File Sink Operator encountered 
-   * @param nd the file sink operator encountered
-   * @param opProcCtx context
+   * File Sink Operator encountered
+   * 
+   * @param nd
+   *          the file sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     ParseContext parseCtx = ctx.getParseCtx();
     boolean chDir = false;
     Task<? extends Serializable> currTask = ctx.getCurrTask();
 
     // Has the user enabled merging of files for map-only jobs or for all jobs
-    if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) 
-    {
+    if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) {
       List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
 
-      // In case of unions or map-joins, it is possible that the file has already been seen.
+      // In case of unions or map-joins, it is possible that the file has
+      // already been seen.
       // So, no need to attempt to merge the files again.
-      if ((ctx.getSeenFileSinkOps() == null) ||
-          (!ctx.getSeenFileSinkOps().contains((FileSinkOperator)nd)))  {
-        
+      if ((ctx.getSeenFileSinkOps() == null)
+          || (!ctx.getSeenFileSinkOps().contains(nd))) {
+
         // no need of merging if the move is to a local file system
-        MoveTask mvTask = (MoveTask)findMoveTask(mvTasks, (FileSinkOperator)nd);
-        if ((mvTask != null) && !mvTask.isLocal())
-        {
-          // There are separate configuration parameters to control whether to merge for a map-only job
+        MoveTask mvTask = (MoveTask) findMoveTask(mvTasks,
+            (FileSinkOperator) nd);
+        if ((mvTask != null) && !mvTask.isLocal()) {
+          // There are separate configuration parameters to control whether to
+          // merge for a map-only job
           // or for a map-reduce job
-          if ((parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
-              (((mapredWork)currTask.getWork()).getReducer() == null)) ||
-              (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
-              (((mapredWork)currTask.getWork()).getReducer() != null)))
+          if ((parseCtx.getConf().getBoolVar(
+              HiveConf.ConfVars.HIVEMERGEMAPFILES) && (((mapredWork) currTask
+              .getWork()).getReducer() == null))
+              || (parseCtx.getConf().getBoolVar(
+                  HiveConf.ConfVars.HIVEMERGEMAPREDFILES) && (((mapredWork) currTask
+                  .getWork()).getReducer() != null))) {
             chDir = true;
+          }
         }
       }
     }
 
     String finalName = processFS(nd, stack, opProcCtx, chDir);
-    
+
     // If it is a map-only job, insert a new task to do the concatenation
     if (chDir && (finalName != null)) {
-      createMergeJob((FileSinkOperator)nd, ctx, finalName);
+      createMergeJob((FileSinkOperator) nd, ctx, finalName);
     }
-    
+
     return null;
   }
-  
-  private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) {
+
+  private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx,
+      String finalName) {
     Task<? extends Serializable> currTask = ctx.getCurrTask();
     RowSchema fsRS = fsOp.getSchema();
-    
+
     // create a reduce Sink operator - key is the first column
     ArrayList<exprNodeDesc> keyCols = new ArrayList<exprNodeDesc>();
-    keyCols.add(TypeCheckProcFactory.DefaultExprProcessor.getFuncExprNodeDesc("rand"));
+    keyCols.add(TypeCheckProcFactory.DefaultExprProcessor
+        .getFuncExprNodeDesc("rand"));
 
     ArrayList<exprNodeDesc> valueCols = new ArrayList<exprNodeDesc>();
     for (ColumnInfo ci : fsRS.getSignature()) {
-      valueCols.add(new exprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci.getTabAlias(),
-          ci.getIsPartitionCol()));
+      valueCols.add(new exprNodeColumnDesc(ci.getType(), ci.getInternalName(),
+          ci.getTabAlias(), ci.getIsPartitionCol()));
     }
 
     // create a dummy tableScan operator
-    Operator<? extends Serializable> ts_op = 
-      OperatorFactory.get(tableScanDesc.class, fsRS);
+    Operator<? extends Serializable> ts_op = OperatorFactory.get(
+        tableScanDesc.class, fsRS);
 
     ArrayList<String> outputColumns = new ArrayList<String>();
-    for (int i = 0; i < valueCols.size(); i++)
+    for (int i = 0; i < valueCols.size(); i++) {
       outputColumns.add(SemanticAnalyzer.getColumnInternalName(i));
-    
-    reduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(new ArrayList<exprNodeDesc>(), valueCols, 
-                                                        outputColumns, false, -1, -1, -1); 
-    ReduceSinkOperator rsOp = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
+    }
+
+    reduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(
+        new ArrayList<exprNodeDesc>(), valueCols, outputColumns, false, -1, -1,
+        -1);
+    OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
     mapredWork cplan = GenMapRedUtils.getMapRedWork();
     ParseContext parseCtx = ctx.getParseCtx();
 
-    Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx.getConf());
+    Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx
+        .getConf());
     fileSinkDesc fsConf = fsOp.getConf();
-    
+
     // Add the extract operator to get the value fields
     RowResolver out_rwsch = new RowResolver();
-    RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRR();
+    RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp)
+        .getRR();
     Integer pos = Integer.valueOf(0);
-    for(ColumnInfo colInfo: interim_rwsch.getColumnInfos()) {
-      String [] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
-      out_rwsch.put(info[0], info[1],
-                    new ColumnInfo(pos.toString(), colInfo.getType(), info[0], 
-                          colInfo.getIsPartitionCol()));
+    for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
+      String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
+      out_rwsch.put(info[0], info[1], new ColumnInfo(pos.toString(), colInfo
+          .getType(), info[0], colInfo.getIsPartitionCol()));
       pos = Integer.valueOf(pos.intValue() + 1);
     }
 
-    Operator extract = 
-      OperatorFactory.getAndMakeChild(
-        new extractDesc(new exprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, 
-                                Utilities.ReduceField.VALUE.toString(), "", false)),
-        new RowSchema(out_rwsch.getColumnInfos()));
-    
-    tableDesc ts = (tableDesc)fsConf.getTableInfo().clone();
-    fsConf.getTableInfo().getProperties().remove(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
-    FileSinkOperator newOutput = 
-      (FileSinkOperator)OperatorFactory.getAndMakeChild(
-         new fileSinkDesc(finalName, ts, 
-                          parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)),
-         fsRS, extract);
+    Operator extract = OperatorFactory.getAndMakeChild(new extractDesc(
+        new exprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+            Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
+        out_rwsch.getColumnInfos()));
+
+    tableDesc ts = (tableDesc) fsConf.getTableInfo().clone();
+    fsConf
+        .getTableInfo()
+        .getProperties()
+        .remove(
+            org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+    FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory
+        .getAndMakeChild(new fileSinkDesc(finalName, ts, parseCtx.getConf()
+            .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)), fsRS, extract);
 
     cplan.setReducer(extract);
     ArrayList<String> aliases = new ArrayList<String>();
     aliases.add(fsConf.getDirName());
     cplan.getPathToAliases().put(fsConf.getDirName(), aliases);
-    cplan.getAliasToWork().put(fsConf.getDirName(), ts_op);    
-    cplan.getPathToPartitionInfo().put(fsConf.getDirName(), new partitionDesc(fsConf.getTableInfo(), null));
+    cplan.getAliasToWork().put(fsConf.getDirName(), ts_op);
+    cplan.getPathToPartitionInfo().put(fsConf.getDirName(),
+        new partitionDesc(fsConf.getTableInfo(), null));
     cplan.setNumReduceTasks(-1);
-    
-    moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false);
-    Task<? extends Serializable> dummyMergeTask = TaskFactory.get(dummyMv, ctx.getConf());
+
+    moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc(fsOp
+        .getConf().getDirName(), finalName, true, null, null), false);
+    Task<? extends Serializable> dummyMergeTask = TaskFactory.get(dummyMv, ctx
+        .getConf());
     List<Serializable> listWorks = new ArrayList<Serializable>();
     listWorks.add(dummyMv);
     listWorks.add(mergeTask.getWork());
     ConditionalWork cndWork = new ConditionalWork(listWorks);
-    
-    ConditionalTask cndTsk = (ConditionalTask)TaskFactory.get(cndWork, ctx.getConf());
+
+    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, ctx
+        .getConf());
     List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
     listTasks.add(dummyMergeTask);
     listTasks.add(mergeTask);
     cndTsk.setListTasks(listTasks);
-    
+
     cndTsk.setResolver(new ConditionalResolverMergeFiles());
-    cndTsk.setResolverCtx(new ConditionalResolverMergeFilesCtx(listTasks, fsOp.getConf().getDirName()));
-    
+    cndTsk.setResolverCtx(new ConditionalResolverMergeFilesCtx(listTasks, fsOp
+        .getConf().getDirName()));
+
     currTask.addDependentTask(cndTsk);
-    
+
     List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
     Task<? extends Serializable> mvTask = findMoveTask(mvTasks, newOutput);
-    
+
     if (mvTask != null) {
-      for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+      for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
         tsk.addDependentTask(mvTask);
+      }
     }
   }
- 
-  private Task<? extends Serializable> findMoveTask(List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {
+
+  private Task<? extends Serializable> findMoveTask(
+      List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {
     // find the move task
     for (Task<? extends Serializable> mvTsk : mvTasks) {
-      moveWork mvWork = (moveWork)mvTsk.getWork();
+      moveWork mvWork = (moveWork) mvTsk.getWork();
       String srcDir = null;
-      if (mvWork.getLoadFileWork() != null) 
+      if (mvWork.getLoadFileWork() != null) {
         srcDir = mvWork.getLoadFileWork().getSourceDir();
-      else if (mvWork.getLoadTableWork() != null)
+      } else if (mvWork.getLoadTableWork() != null) {
         srcDir = mvWork.getLoadTableWork().getSourceDir();
-      
-      if ((srcDir != null) && (srcDir.equalsIgnoreCase(fsOp.getConf().getDirName())))
+      }
+
+      if ((srcDir != null)
+          && (srcDir.equalsIgnoreCase(fsOp.getConf().getDirName()))) {
         return mvTsk;
+      }
     }
-     
+
     return null;
   }
-  
-  private String processFS(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, boolean chDir) 
-    throws SemanticException {
-    
+
+  private String processFS(Node nd, Stack<Node> stack,
+      NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
+
     // Is it the dummy file sink after the mapjoin
-    FileSinkOperator fsOp = (FileSinkOperator)nd;
-    if ((fsOp.getParentOperators().size() == 1) && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator))
+    FileSinkOperator fsOp = (FileSinkOperator) nd;
+    if ((fsOp.getParentOperators().size() == 1)
+        && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator)) {
       return null;
+    }
 
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     List<FileSinkOperator> seenFSOps = ctx.getSeenFileSinkOps();
-    if (seenFSOps == null) 
+    if (seenFSOps == null) {
       seenFSOps = new ArrayList<FileSinkOperator>();
-    if (!seenFSOps.contains(fsOp))
+    }
+    if (!seenFSOps.contains(fsOp)) {
       seenFSOps.add(fsOp);
+    }
     ctx.setSeenFileSinkOps(seenFSOps);
 
     Task<? extends Serializable> currTask = ctx.getCurrTask();
-    
+
     // If the directory needs to be changed, send the new directory
     String dest = null;
 
@@ -256,27 +283,30 @@
       ParseContext parseCtx = ctx.getParseCtx();
       Context baseCtx = parseCtx.getContext();
       String tmpDir = baseCtx.getMRTmpFileURI();
-      
+
       fsOp.getConf().setDirName(tmpDir);
     }
-    
-    boolean ret = false;
+
     Task<? extends Serializable> mvTask = null;
-    
-    if (!chDir)
+
+    if (!chDir) {
       mvTask = findMoveTask(ctx.getMvTask(), fsOp);
-    
+    }
+
     Operator<? extends Serializable> currTopOp = ctx.getCurrTopOp();
     String currAliasId = ctx.getCurrAliasId();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+        .getOpTaskMap();
     List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
-    List<Task<? extends Serializable>>  rootTasks = ctx.getRootTasks();
+    List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
 
     // Set the move task to be dependent on the current task
-    if (mvTask != null) 
-      ret = currTask.addDependentTask(mvTask);
-    
-    // In case of multi-table insert, the path to alias mapping is needed for all the sources. Since there is no
+    if (mvTask != null) {
+      currTask.addDependentTask(mvTask);
+    }
+
+    // In case of multi-table insert, the path to alias mapping is needed for
+    // all the sources. Since there is no
     // reducer, treat it as a plan with null reducer
     // If it is a map-only job, the task needs to be processed
     if (currTopOp != null) {
@@ -284,19 +314,20 @@
       if (mapTask == null) {
         assert (!seenOps.contains(currTopOp));
         seenOps.add(currTopOp);
-        GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, (mapredWork) currTask.getWork(), false, ctx);
+        GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
+            (mapredWork) currTask.getWork(), false, ctx);
         opTaskMap.put(null, currTask);
         rootTasks.add(currTask);
-      }
-      else {
+      } else {
         if (!seenOps.contains(currTopOp)) {
           seenOps.add(currTopOp);
-          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, (mapredWork) mapTask.getWork(), false, ctx);
+          GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
+              (mapredWork) mapTask.getWork(), false, ctx);
         }
-        // mapTask and currTask should be merged by and join/union operator 
+        // mapTask and currTask should be merged by and join/union operator
         // (e.g., GenMRUnion1j) which has multiple topOps.
-        assert mapTask == currTask :
-               "mapTask.id = " + mapTask.getId() + "; currTask.id = " + currTask.getId();
+        assert mapTask == currTask : "mapTask.id = " + mapTask.getId()
+            + "; currTask.id = " + currTask.getId();
       }
 
       return dest;
@@ -304,30 +335,31 @@
     }
 
     UnionOperator currUnionOp = ctx.getCurrUnionOp();
-    
-    if  (currUnionOp != null) {
+
+    if (currUnionOp != null) {
       opTaskMap.put(null, currTask);
       GenMapRedUtils.initUnionPlan(ctx, currTask, false);
       return dest;
     }
-    
+
     MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp();
-    
-    if  (currMapJoinOp != null) {
+
+    if (currMapJoinOp != null) {
       opTaskMap.put(null, currTask);
       GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(currMapJoinOp);
       mapredWork plan = (mapredWork) currTask.getWork();
 
       String taskTmpDir = mjCtx.getTaskTmpDir();
-      tableDesc tt_desc = mjCtx.getTTDesc(); 
+      tableDesc tt_desc = mjCtx.getTTDesc();
       assert plan.getPathToAliases().get(taskTmpDir) == null;
       plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
       plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-      plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+      plan.getPathToPartitionInfo().put(taskTmpDir,
+          new partitionDesc(tt_desc, null));
       plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
       return dest;
     }
-    
+
     return dest;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java Thu Jan 21 10:37:58 2010
@@ -23,11 +23,11 @@
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
  * Processor for the rule - no specific rule fired
@@ -38,17 +38,23 @@
   }
 
   /**
-   * Reduce Scan encountered 
-   * @param nd the reduce sink operator encountered
-   * @param procCtx context
+   * Reduce Scan encountered
+   * 
+   * @param nd
+   *          the reduce sink operator encountered
+   * @param procCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
-    GenMRProcContext ctx = (GenMRProcContext)procCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+    GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-    GenMapRedCtx mapredCtx = mapCurrCtx.get((Operator<? extends Serializable>)stack.get(stack.size()-2));
-    mapCurrCtx.put((Operator<? extends Serializable>)nd, 
-        new GenMapRedCtx(mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx.getCurrAliasId()));
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
+    mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+        mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx
+            .getCurrAliasId()));
     return null;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,19 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.LinkedHashMap;
-import java.util.List;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Set;
-import java.io.Serializable;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -40,32 +38,35 @@
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 
 /**
- * Processor Context for creating map reduce task. Walk the tree in a DFS manner and process the nodes. Some state is 
- * maintained about the current nodes visited so far.
+ * Processor Context for creating map reduce task. Walk the tree in a DFS manner
+ * and process the nodes. Some state is maintained about the current nodes
+ * visited so far.
  */
 public class GenMRProcContext implements NodeProcessorCtx {
 
-  /** 
-   * GenMapRedCtx is used to keep track of the current state. 
+  /**
+   * GenMapRedCtx is used to keep track of the current state.
    */
   public static class GenMapRedCtx {
-    Task<? extends Serializable>         currTask;
-    Operator<? extends Serializable>     currTopOp;
-    String                               currAliasId;
-    
-    public GenMapRedCtx() {  
+    Task<? extends Serializable> currTask;
+    Operator<? extends Serializable> currTopOp;
+    String currAliasId;
+
+    public GenMapRedCtx() {
     }
-    
+
     /**
-     * @param currTask    the current task
-     * @param currTopOp   the current top operator being traversed
-     * @param currAliasId the current alias for the to operator
+     * @param currTask
+     *          the current task
+     * @param currTopOp
+     *          the current top operator being traversed
+     * @param currAliasId
+     *          the current alias for the to operator
      */
-    public GenMapRedCtx (Task<? extends Serializable>         currTask,
-                         Operator<? extends Serializable>     currTopOp,
-                         String                               currAliasId) {
-      this.currTask    = currTask;
-      this.currTopOp   = currTopOp;
+    public GenMapRedCtx(Task<? extends Serializable> currTask,
+        Operator<? extends Serializable> currTopOp, String currAliasId) {
+      this.currTask = currTask;
+      this.currTopOp = currTopOp;
       this.currAliasId = currAliasId;
     }
 
@@ -92,24 +93,24 @@
   }
 
   public static class GenMRUnionCtx {
-    Task<? extends Serializable>         uTask;
-    List<String>                         taskTmpDir;
-    List<tableDesc>                      tt_desc; 
+    Task<? extends Serializable> uTask;
+    List<String> taskTmpDir;
+    List<tableDesc> tt_desc;
 
-    public GenMRUnionCtx() { 
+    public GenMRUnionCtx() {
       uTask = null;
       taskTmpDir = new ArrayList<String>();
-      tt_desc = new ArrayList<tableDesc>(); 
+      tt_desc = new ArrayList<tableDesc>();
     }
 
-    public Task<? extends Serializable> getUTask() { 
+    public Task<? extends Serializable> getUTask() {
       return uTask;
     }
 
-    public void setUTask(Task<? extends Serializable> uTask) { 
+    public void setUTask(Task<? extends Serializable> uTask) {
       this.uTask = uTask;
     }
-    
+
     public void addTaskTmpDir(String taskTmpDir) {
       this.taskTmpDir.add(taskTmpDir);
     }
@@ -128,16 +129,16 @@
   }
 
   public static class GenMRMapJoinCtx {
-    String                            taskTmpDir;
-    tableDesc                         tt_desc; 
-    Operator<? extends Serializable>  rootMapJoinOp;
-    MapJoinOperator                   oldMapJoin;   
-    
-    public GenMRMapJoinCtx() { 
-      taskTmpDir    = null;
-      tt_desc       = null;
+    String taskTmpDir;
+    tableDesc tt_desc;
+    Operator<? extends Serializable> rootMapJoinOp;
+    MapJoinOperator oldMapJoin;
+
+    public GenMRMapJoinCtx() {
+      taskTmpDir = null;
+      tt_desc = null;
       rootMapJoinOp = null;
-      oldMapJoin    = null;
+      oldMapJoin = null;
     }
 
     /**
@@ -146,14 +147,15 @@
      * @param rootMapJoinOp
      * @param oldMapJoin
      */
-    public GenMRMapJoinCtx(String taskTmpDir, tableDesc tt_desc, 
-        Operator<? extends Serializable> rootMapJoinOp, MapJoinOperator oldMapJoin) {
-      this.taskTmpDir    = taskTmpDir;
-      this.tt_desc       = tt_desc;
+    public GenMRMapJoinCtx(String taskTmpDir, tableDesc tt_desc,
+        Operator<? extends Serializable> rootMapJoinOp,
+        MapJoinOperator oldMapJoin) {
+      this.taskTmpDir = taskTmpDir;
+      this.tt_desc = tt_desc;
       this.rootMapJoinOp = rootMapJoinOp;
-      this.oldMapJoin    = oldMapJoin;
+      this.oldMapJoin = oldMapJoin;
     }
-    
+
     public void setTaskTmpDir(String taskTmpDir) {
       this.taskTmpDir = taskTmpDir;
     }
@@ -178,7 +180,8 @@
     }
 
     /**
-     * @param rootMapJoinOp the rootMapJoinOp to set
+     * @param rootMapJoinOp
+     *          the rootMapJoinOp to set
      */
     public void setRootMapJoinOp(Operator<? extends Serializable> rootMapJoinOp) {
       this.rootMapJoinOp = rootMapJoinOp;
@@ -192,7 +195,8 @@
     }
 
     /**
-     * @param oldMapJoin the oldMapJoin to set
+     * @param oldMapJoin
+     *          the oldMapJoin to set
      */
     public void setOldMapJoin(MapJoinOperator oldMapJoin) {
       this.oldMapJoin = oldMapJoin;
@@ -201,174 +205,188 @@
 
   private HiveConf conf;
   private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
-  private HashMap<UnionOperator, GenMRUnionCtx>      unionTaskMap;
-  private HashMap<MapJoinOperator, GenMRMapJoinCtx>  mapJoinTaskMap;
+  private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
+  private HashMap<MapJoinOperator, GenMRMapJoinCtx> mapJoinTaskMap;
   private List<Operator<? extends Serializable>> seenOps;
-  private List<FileSinkOperator>                 seenFileSinkOps;
+  private List<FileSinkOperator> seenFileSinkOps;
 
-  private ParseContext                          parseCtx;
-  private List<Task<? extends Serializable>>    mvTask;
-  private List<Task<? extends Serializable>>    rootTasks;
-
-  private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx; 
-  private Task<? extends Serializable>         currTask;
-  private Operator<? extends Serializable>     currTopOp;
-  private UnionOperator                        currUnionOp;
-  private MapJoinOperator                      currMapJoinOp;
-  private String                               currAliasId;
+  private ParseContext parseCtx;
+  private List<Task<? extends Serializable>> mvTask;
+  private List<Task<? extends Serializable>> rootTasks;
+
+  private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx;
+  private Task<? extends Serializable> currTask;
+  private Operator<? extends Serializable> currTopOp;
+  private UnionOperator currUnionOp;
+  private MapJoinOperator currMapJoinOp;
+  private String currAliasId;
   private List<Operator<? extends Serializable>> rootOps;
-  
+
   /**
-   * Set of read entities. This list is generated by the walker and is 
-   * passed to the hooks.
+   * Set of read entities. This list is generated by the walker and is passed to
+   * the hooks.
    */
-  private Set<ReadEntity>                     inputs;
+  private Set<ReadEntity> inputs;
   /**
-   * Set of write entities. This list is generated by the walker and is
-   * passed to the hooks.
-   */
-  private Set<WriteEntity>                    outputs;
-  
-  public GenMRProcContext() {  
-  }
-  
-  /**
-   * @param conf       hive configuration
-   * @param opTaskMap  reducer to task mapping
-   * @param seenOps    operator already visited
-   * @param parseCtx   current parse context
-   * @param rootTasks  root tasks for the plan
-   * @param mvTask     the final move task
-   * @param mapCurrCtx operator to task mappings
-   * @param inputs     the set of input tables/partitions generated by the walk
-   * @param outputs    the set of destinations generated by the walk
-   */
-  public GenMRProcContext (
-    HiveConf conf,
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
-    List<Operator<? extends Serializable>> seenOps,
-    ParseContext                           parseCtx,
-    List<Task<? extends Serializable>>     mvTask,
-    List<Task<? extends Serializable>>     rootTasks,
-    LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
-    Set<ReadEntity> inputs,
-    Set<WriteEntity> outputs) 
-  {
-    this.conf       = conf;
-    this.opTaskMap  = opTaskMap;
-    this.seenOps    = seenOps;
-    this.mvTask     = mvTask;
-    this.parseCtx   = parseCtx;
-    this.rootTasks  = rootTasks;
+   * Set of write entities. This list is generated by the walker and is passed
+   * to the hooks.
+   */
+  private Set<WriteEntity> outputs;
+
+  public GenMRProcContext() {
+  }
+
+  /**
+   * @param conf
+   *          hive configuration
+   * @param opTaskMap
+   *          reducer to task mapping
+   * @param seenOps
+   *          operator already visited
+   * @param parseCtx
+   *          current parse context
+   * @param rootTasks
+   *          root tasks for the plan
+   * @param mvTask
+   *          the final move task
+   * @param mapCurrCtx
+   *          operator to task mappings
+   * @param inputs
+   *          the set of input tables/partitions generated by the walk
+   * @param outputs
+   *          the set of destinations generated by the walk
+   */
+  public GenMRProcContext(
+      HiveConf conf,
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
+      List<Operator<? extends Serializable>> seenOps, ParseContext parseCtx,
+      List<Task<? extends Serializable>> mvTask,
+      List<Task<? extends Serializable>> rootTasks,
+      LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+    this.conf = conf;
+    this.opTaskMap = opTaskMap;
+    this.seenOps = seenOps;
+    this.mvTask = mvTask;
+    this.parseCtx = parseCtx;
+    this.rootTasks = rootTasks;
     this.mapCurrCtx = mapCurrCtx;
     this.inputs = inputs;
     this.outputs = outputs;
-    currTask        = null;
-    currTopOp       = null;
-    currUnionOp     = null;
-    currMapJoinOp   = null;
-    currAliasId     = null;
-    rootOps         = new ArrayList<Operator<? extends Serializable>>();
+    currTask = null;
+    currTopOp = null;
+    currUnionOp = null;
+    currMapJoinOp = null;
+    currAliasId = null;
+    rootOps = new ArrayList<Operator<? extends Serializable>>();
     rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
     mapJoinTaskMap = new HashMap<MapJoinOperator, GenMRMapJoinCtx>();
   }
 
   /**
-   * @return  reducer to task mapping
+   * @return reducer to task mapping
    */
   public HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> getOpTaskMap() {
     return opTaskMap;
   }
 
   /**
-   * @param opTaskMap  reducer to task mapping
+   * @param opTaskMap
+   *          reducer to task mapping
    */
-  public void setOpTaskMap(HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
+  public void setOpTaskMap(
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
     this.opTaskMap = opTaskMap;
   }
 
   /**
-   * @return  operators already visited
+   * @return operators already visited
    */
   public List<Operator<? extends Serializable>> getSeenOps() {
     return seenOps;
   }
 
   /**
-   * @return  file operators already visited
+   * @return file operators already visited
    */
   public List<FileSinkOperator> getSeenFileSinkOps() {
     return seenFileSinkOps;
   }
 
   /**
-   * @param seenOps    operators already visited
+   * @param seenOps
+   *          operators already visited
    */
   public void setSeenOps(List<Operator<? extends Serializable>> seenOps) {
     this.seenOps = seenOps;
   }
 
   /**
-   * @param seenFileSinkOps file sink operators already visited
+   * @param seenFileSinkOps
+   *          file sink operators already visited
    */
   public void setSeenFileSinkOps(List<FileSinkOperator> seenFileSinkOps) {
     this.seenFileSinkOps = seenFileSinkOps;
   }
 
   /**
-   * @return  top operators for tasks
+   * @return top operators for tasks
    */
   public List<Operator<? extends Serializable>> getRootOps() {
     return rootOps;
   }
 
   /**
-   * @param rootOps    top operators for tasks
+   * @param rootOps
+   *          top operators for tasks
    */
   public void setRootOps(List<Operator<? extends Serializable>> rootOps) {
     this.rootOps = rootOps;
   }
 
   /**
-   * @return   current parse context
+   * @return current parse context
    */
   public ParseContext getParseCtx() {
     return parseCtx;
   }
 
   /**
-   * @param parseCtx   current parse context
+   * @param parseCtx
+   *          current parse context
    */
   public void setParseCtx(ParseContext parseCtx) {
     this.parseCtx = parseCtx;
   }
 
   /**
-   * @return     the final move task
+   * @return the final move task
    */
   public List<Task<? extends Serializable>> getMvTask() {
     return mvTask;
   }
 
   /**
-   * @param mvTask     the final move task
+   * @param mvTask
+   *          the final move task
    */
   public void setMvTask(List<Task<? extends Serializable>> mvTask) {
     this.mvTask = mvTask;
   }
 
   /**
-   * @return  root tasks for the plan
+   * @return root tasks for the plan
    */
-  public List<Task<? extends Serializable>>  getRootTasks() {
+  public List<Task<? extends Serializable>> getRootTasks() {
     return rootTasks;
   }
 
   /**
-   * @param rootTasks  root tasks for the plan
+   * @param rootTasks
+   *          root tasks for the plan
    */
-  public void setRootTasks(List<Task<? extends Serializable>>  rootTasks) {
+  public void setRootTasks(List<Task<? extends Serializable>> rootTasks) {
     this.rootTasks = rootTasks;
   }
 
@@ -380,23 +398,26 @@
   }
 
   /**
-   * @param mapCurrCtx operator to task mappings
+   * @param mapCurrCtx
+   *          operator to task mappings
    */
-  public void setMapCurrCtx(LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
+  public void setMapCurrCtx(
+      LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
     this.mapCurrCtx = mapCurrCtx;
   }
 
   /**
    * @return current task
    */
-  public Task<? extends Serializable>  getCurrTask() {
+  public Task<? extends Serializable> getCurrTask() {
     return currTask;
   }
 
   /**
-   * @param currTask current task
+   * @param currTask
+   *          current task
    */
-  public void setCurrTask(Task<? extends Serializable>  currTask) {
+  public void setCurrTask(Task<? extends Serializable> currTask) {
     this.currTask = currTask;
   }
 
@@ -405,46 +426,50 @@
    */
   public Operator<? extends Serializable> getCurrTopOp() {
     return currTopOp;
-  }   
-   
+  }
+
   /**
-   * @param currTopOp current top operator
+   * @param currTopOp
+   *          current top operator
    */
   public void setCurrTopOp(Operator<? extends Serializable> currTopOp) {
     this.currTopOp = currTopOp;
-  }      
+  }
 
   public UnionOperator getCurrUnionOp() {
     return currUnionOp;
-  }   
-   
+  }
+
   /**
-   * @param currUnionOp current union operator
+   * @param currUnionOp
+   *          current union operator
    */
   public void setCurrUnionOp(UnionOperator currUnionOp) {
     this.currUnionOp = currUnionOp;
-  }      
+  }
 
   public MapJoinOperator getCurrMapJoinOp() {
     return currMapJoinOp;
-  }   
-   
+  }
+
   /**
-   * @param currMapJoinOp current map join operator
+   * @param currMapJoinOp
+   *          current map join operator
    */
   public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) {
     this.currMapJoinOp = currMapJoinOp;
-  }      
+  }
 
   /**
    * @return current top alias
    */
-  public String  getCurrAliasId() {
+  public String getCurrAliasId() {
     return currAliasId;
   }
 
   /**
-   * @param currAliasId current top alias
+   * @param currAliasId
+   *          current top alias
    */
   public void setCurrAliasId(String currAliasId) {
     this.currAliasId = currAliasId;
@@ -472,7 +497,7 @@
   public Set<ReadEntity> getInputs() {
     return inputs;
   }
-  
+
   /**
    * Get the output set.
    */
@@ -488,7 +513,8 @@
   }
 
   /**
-   * @param conf the conf to set
+   * @param conf
+   *          the conf to set
    */
   public void setConf(HiveConf conf) {
     this.conf = conf;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Thu Jan 21 10:37:58 2010
@@ -18,20 +18,20 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.Map;
+import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Stack;
-import java.io.Serializable;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
 
 /**
  * Processor for the rule - table scan followed by reduce sink
@@ -42,22 +42,28 @@
   }
 
   /**
-   * Reduce Scan encountered 
-   * @param nd the reduce sink operator encountered
-   * @param opProcCtx context
+   * Reduce Scan encountered
+   * 
+   * @param nd
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    ReduceSinkOperator op = (ReduceSinkOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
-
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
-    GenMapRedCtx mapredCtx = mapCurrCtx.get((Operator<? extends Serializable>)stack.get(stack.size()-2));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    ReduceSinkOperator op = (ReduceSinkOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
+
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork currPlan = (mapredWork) currTask.getWork();
-    Operator<? extends Serializable> currTopOp   = mapredCtx.getCurrTopOp();
+    Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+        .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
     ctx.setCurrTopOp(currTopOp);
@@ -66,20 +72,24 @@
 
     // If the plan for this reducer does not exist, initialize the plan
     if (opMapTask == null) {
-      if (currPlan.getReducer() == null) 
+      if (currPlan.getReducer() == null) {
         GenMapRedUtils.initPlan(op, ctx);
-      else
+      } else {
         GenMapRedUtils.splitPlan(op, ctx);
+      }
     }
-    // This will happen in case of joins. The current plan can be thrown away after being merged with the
+    // This will happen in case of joins. The current plan can be thrown away
+    // after being merged with the
     // original plan
     else {
-      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
+      GenMapRedUtils
+          .joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+        ctx.getCurrAliasId()));
     return null;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Thu Jan 21 10:37:58 2010
@@ -28,8 +28,8 @@
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
  * Processor for the rule - reduce sink followed by reduce sink
@@ -40,36 +40,44 @@
   }
 
   /**
-   * Reduce Scan encountered 
-   * @param nd the reduce sink operator encountered
-   * @param opProcCtx context
+   * Reduce Scan encountered
+   * 
+   * @param nd
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    ReduceSinkOperator op = (ReduceSinkOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    ReduceSinkOperator op = (ReduceSinkOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask      = mapredCtx.getCurrTask();
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+        .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
     ctx.setCurrTopOp(currTopOp);
     ctx.setCurrAliasId(currAliasId);
     ctx.setCurrTask(currTask);
 
-    if (opMapTask == null)
+    if (opMapTask == null) {
       GenMapRedUtils.splitPlan(op, ctx);
-    else {
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
+    } else {
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false,
+          false);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+        ctx.getCurrAliasId()));
     return null;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Thu Jan 21 10:37:58 2010
@@ -18,26 +18,22 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.Map;
+import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Stack;
-import java.io.Serializable;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
 
 /**
  * Processor for the rule - union followed by reduce sink
@@ -48,54 +44,64 @@
   }
 
   /**
-   * Reduce Scan encountered 
-   * @param nd the reduce sink operator encountered
-   * @param opProcCtx context
+   * Reduce Scan encountered
+   * 
+   * @param nd
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    ReduceSinkOperator op = (ReduceSinkOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    ReduceSinkOperator op = (ReduceSinkOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
 
     ParseContext parseCtx = ctx.getParseCtx();
     UnionProcContext uCtx = parseCtx.getUCtx();
 
     // union was map only - no special processing needed
-    if (uCtx.isMapOnlySubq())
+    if (uCtx.isMapOnlySubq()) {
       return (new GenMRRedSink1()).process(nd, stack, opProcCtx, nodeOutputs);
+    }
 
-    // union consisted on a bunch of map-reduce jobs, and it has been split at the union
+    // union consisted on a bunch of map-reduce jobs, and it has been split at
+    // the union
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork plan = (mapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+        .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-    
+
     ctx.setCurrTask(currTask);
 
     // If the plan for this reducer does not exist, initialize the plan
     if (opMapTask == null) {
       // When the reducer is encountered for the first time
-      if (plan.getReducer() == null)
+      if (plan.getReducer() == null) {
         GenMapRedUtils.initUnionPlan(op, ctx);
-      // When union is followed by a multi-table insert
-      else
+        // When union is followed by a multi-table insert
+      } else {
         GenMapRedUtils.splitPlan(op, ctx);
+      }
     }
-    // The union is already initialized. However, the union is walked from another input
+    // The union is already initialized. However, the union is walked from
+    // another input
     // initUnionPlan is idempotent
-    else if (plan.getReducer() == reducer)
+    else if (plan.getReducer() == reducer) {
       GenMapRedUtils.initUnionPlan(op, ctx);
-    // There is a join after union. One of the branches of union has already been initialized.
-    // Initialize the current branch, and join with the original plan.
-    else {
+    } else {
       GenMapRedUtils.initUnionPlan(ctx, currTask, false);
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false,
+          false);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
-    
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+        ctx.getCurrAliasId()));
+
     // the union operator has been processed
     ctx.setCurrUnionOp(null);
     return null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,20 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.Map;
+import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Stack;
-import java.io.Serializable;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
 
 /**
  * Processor for the rule - map join followed by reduce sink
@@ -43,45 +42,56 @@
   }
 
   /**
-   * Reduce Scan encountered 
-   * @param nd the reduce sink operator encountered
-   * @param opProcCtx context
+   * Reduce Scan encountered
+   * 
+   * @param nd
+   *          the reduce sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    ReduceSinkOperator op = (ReduceSinkOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    ReduceSinkOperator op = (ReduceSinkOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
 
-    ParseContext parseCtx = ctx.getParseCtx();
+    ctx.getParseCtx();
 
-    // map-join consisted on a bunch of map-only jobs, and it has been split after the mapjoin
+    // map-join consisted on a bunch of map-only jobs, and it has been split
+    // after the mapjoin
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     mapredWork plan = (mapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+        .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-    
+
     ctx.setCurrTask(currTask);
 
     // If the plan for this reducer does not exist, initialize the plan
     if (opMapTask == null) {
       // When the reducer is encountered for the first time
-      if (plan.getReducer() == null)
+      if (plan.getReducer() == null) {
         GenMapRedUtils.initMapJoinPlan(op, ctx, true, false, true, -1);
-      // When mapjoin is followed by a multi-table insert
-      else
+        // When mapjoin is followed by a multi-table insert
+      } else {
         GenMapRedUtils.splitPlan(op, ctx);
+      }
     }
-    // There is a join after mapjoin. One of the branches of mapjoin has already been initialized.
+    // There is a join after mapjoin. One of the branches of mapjoin has already
+    // been initialized.
     // Initialize the current branch, and join with the original plan.
     else {
       assert plan.getReducer() != reducer;
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, false);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true,
+          false);
     }
 
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
-    
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+        ctx.getCurrAliasId()));
+
     // the mapjoin operator has been processed
     ctx.setCurrMapJoinOp(null);
     return null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Thu Jan 21 10:37:58 2010
@@ -29,9 +29,9 @@
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
  * Processor for the rule - table scan
@@ -41,18 +41,24 @@
   }
 
   /**
-   * Table Sink encountered 
-   * @param nd the table sink operator encountered
-   * @param opProcCtx context
+   * Table Sink encountered
+   * 
+   * @param nd
+   *          the table sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    TableScanOperator op = (TableScanOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    TableScanOperator op = (TableScanOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     ParseContext parseCtx = ctx.getParseCtx();
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
 
-    // create a dummy task 
-    Task<? extends Serializable> currTask  = TaskFactory.get(GenMapRedUtils.getMapRedWork(), parseCtx.getConf());
+    // create a dummy task
+    Task<? extends Serializable> currTask = TaskFactory.get(GenMapRedUtils
+        .getMapRedWork(), parseCtx.getConf());
     Operator<? extends Serializable> currTopOp = op;
     ctx.setCurrTask(currTask);
     ctx.setCurrTopOp(currTopOp);
@@ -71,4 +77,3 @@
   }
 
 }
-

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Thu Jan 21 10:37:58 2010
@@ -18,39 +18,37 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
-import java.util.Stack;
-import java.io.Serializable;
-import java.io.File;
 import java.util.Map;
+import java.util.Stack;
 
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 
 /**
  * Processor for the rule - TableScan followed by Union
@@ -61,41 +59,51 @@
   }
 
   /**
-   * Union Operator encountered .
-   * Currently, the algorithm is pretty simple:
-   *   If all the sub-queries are map-only, dont do anything.
-   *   However, if there is a mapjoin followed by the union, merge at the union
-   *   Otherwise, insert a FileSink on top of all the sub-queries.
-   *
+   * Union Operator encountered . Currently, the algorithm is pretty simple: If
+   * all the sub-queries are map-only, dont do anything. However, if there is a
+   * mapjoin followed by the union, merge at the union Otherwise, insert a
+   * FileSink on top of all the sub-queries.
+   * 
    * This can be optimized later on.
-   * @param nd the file sink operator encountered
-   * @param opProcCtx context
+   * 
+   * @param nd
+   *          the file sink operator encountered
+   * @param opProcCtx
+   *          context
    */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
-    UnionOperator union = (UnionOperator)nd;
-    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+      Object... nodeOutputs) throws SemanticException {
+    UnionOperator union = (UnionOperator) nd;
+    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     ParseContext parseCtx = ctx.getParseCtx();
     UnionProcContext uCtx = parseCtx.getUCtx();
 
-    // Map-only subqueries can be optimized in future to not write to a file in future
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    // Map-only subqueries can be optimized in future to not write to a file in
+    // future
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        .getMapCurrCtx();
 
-   // The plan needs to be broken only if one of the sub-queries involve a map-reduce job
+    // The plan needs to be broken only if one of the sub-queries involve a
+    // map-reduce job
     if (uCtx.isMapOnlySubq()) {
       // merge currTask from multiple topOps
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
-    	if ( opTaskMap != null && opTaskMap.size() > 0 ) {
-     		Task<? extends Serializable> tsk = opTaskMap.get(null);
-        if ( tsk != null )
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+          .getOpTaskMap();
+      if (opTaskMap != null && opTaskMap.size() > 0) {
+        Task<? extends Serializable> tsk = opTaskMap.get(null);
+        if (tsk != null) {
           ctx.setCurrTask(tsk);
+        }
       }
- 
+
       UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
       if ((uPrsCtx != null) && (uPrsCtx.getMapJoinQuery())) {
-        GenMapRedUtils.mergeMapJoinUnion(union, ctx, UnionProcFactory.getPositionParent(union, stack));
+        GenMapRedUtils.mergeMapJoinUnion(union, ctx, UnionProcFactory
+            .getPositionParent(union, stack));
+      } else {
+        mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+            ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
       }
-      else
-        mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
       return null;
     }
 
@@ -108,13 +116,15 @@
     int pos = UnionProcFactory.getPositionParent(union, stack);
 
     // is the current task a root task
-    if (uPrsCtx.getRootTask(pos) && (!ctx.getRootTasks().contains(currTask)))
+    if (uPrsCtx.getRootTask(pos) && (!ctx.getRootTasks().contains(currTask))) {
       ctx.getRootTasks().add(currTask);
+    }
 
     GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
     Task<? extends Serializable> uTask = null;
 
-    Operator<? extends Serializable> parent = union.getParentOperators().get(pos);
+    Operator<? extends Serializable> parent = union.getParentOperators().get(
+        pos);
     mapredWork uPlan = null;
 
     // union is encountered for the first time
@@ -124,10 +134,9 @@
       uTask = TaskFactory.get(uPlan, parseCtx.getConf());
       uCtxTask.setUTask(uTask);
       ctx.setUnionTask(union, uCtxTask);
-    }
-    else {
+    } else {
       uTask = uCtxTask.getUTask();
-      uPlan = (mapredWork)uTask.getWork();
+      uPlan = (mapredWork) uTask.getWork();
     }
 
     // If there is a mapjoin at position 'pos'
@@ -143,12 +152,13 @@
       assert plan.getPathToAliases().get(taskTmpDir) == null;
       plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
       plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-      plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+      plan.getPathToPartitionInfo().put(taskTmpDir,
+          new partitionDesc(tt_desc, null));
       plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
     }
 
-    tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(
-          PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+    tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+        .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
 
     // generate the temporary file
     Context baseCtx = parseCtx.getContext();
@@ -158,15 +168,14 @@
     uCtxTask.addTaskTmpDir(taskTmpDir);
     uCtxTask.addTTDesc(tt_desc);
 
-    // The union task is empty. The files created for all the inputs are assembled in the
+    // The union task is empty. The files created for all the inputs are
+    // assembled in the
     // union context and later used to initialize the union plan
 
     // Create a file sink operator for this file name
-    Operator<? extends Serializable> fs_op =
-      OperatorFactory.get
-      (new fileSinkDesc(taskTmpDir, tt_desc,
-                        parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
-       parent.getSchema());
+    Operator<? extends Serializable> fs_op = OperatorFactory.get(
+        new fileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
+            HiveConf.ConfVars.COMPRESSINTERMEDIATE)), parent.getSchema());
 
     assert parent.getChildOperators().size() == 1;
     parent.getChildOperators().set(0, fs_op);
@@ -178,14 +187,17 @@
     currTask.addDependentTask(uTask);
 
     // If it is map-only task, add the files to be processed
-    if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos))
-      GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(), (mapredWork) currTask.getWork(), false, ctx);
+    if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
+      GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(),
+          (mapredWork) currTask.getWork(), false, ctx);
+    }
 
     ctx.setCurrTask(uTask);
     ctx.setCurrAliasId(null);
     ctx.setCurrTopOp(null);
 
-    mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+    mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(ctx
+        .getCurrTask(), null, null));
 
     return null;
   }