You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/08/29 19:44:02 UTC

svn commit: r1378659 [2/4] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/index/compact/ java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/lib/ java/org/apach...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 
 /**
@@ -40,15 +40,15 @@ import org.apache.hadoop.hive.ql.plan.Se
  */
 public class ColumnPrunerProcCtx implements NodeProcessorCtx {
 
-  private final Map<Operator<? extends Serializable>, List<String>> prunedColLists;
+  private final Map<Operator<? extends OperatorDesc>, List<String>> prunedColLists;
 
-  private final HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+  private final HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap;
 
   private final Map<CommonJoinOperator, Map<Byte, List<String>>> joinPrunedColLists;
 
   public ColumnPrunerProcCtx(
-      HashMap<Operator<? extends Serializable>, OpParseContext> opToParseContextMap) {
-    prunedColLists = new HashMap<Operator<? extends Serializable>, List<String>>();
+      HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseContextMap) {
+    prunedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>();
     opToParseCtxMap = opToParseContextMap;
     joinPrunedColLists = new HashMap<CommonJoinOperator, Map<Byte, List<String>>>();
   }
@@ -60,15 +60,15 @@ public class ColumnPrunerProcCtx impleme
   /**
    * @return the prunedColLists
    */
-  public List<String> getPrunedColList(Operator<? extends Serializable> op) {
+  public List<String> getPrunedColList(Operator<? extends OperatorDesc> op) {
     return prunedColLists.get(op);
   }
 
-  public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
+  public HashMap<Operator<? extends OperatorDesc>, OpParseContext> getOpToParseCtxMap() {
     return opToParseCtxMap;
   }
 
-  public Map<Operator<? extends Serializable>, List<String>> getPrunedColLists() {
+  public Map<Operator<? extends OperatorDesc>, List<String>> getPrunedColLists() {
     return prunedColLists;
   }
 
@@ -77,17 +77,17 @@ public class ColumnPrunerProcCtx impleme
    * RowResolver and are different from the external column names) that are
    * needed in the subtree. These columns eventually have to be selected from
    * the table scan.
-   * 
+   *
    * @param curOp
    *          The root of the operator subtree.
    * @return List<String> of the internal column names.
    * @throws SemanticException
    */
-  public List<String> genColLists(Operator<? extends Serializable> curOp)
+  public List<String> genColLists(Operator<? extends OperatorDesc> curOp)
       throws SemanticException {
     List<String> colList = new ArrayList<String>();
     if (curOp.getChildOperators() != null) {
-      for (Operator<? extends Serializable> child : curOp.getChildOperators()) {
+      for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) {
         if (child instanceof CommonJoinOperator) {
           int tag = child.getParentOperators().indexOf(curOp);
           List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
@@ -105,7 +105,7 @@ public class ColumnPrunerProcCtx impleme
    * Creates the list of internal column names from select expressions in a
    * select operator. This function is used for the select operator instead of
    * the genColLists function (which is used by the rest of the operators).
-   * 
+   *
    * @param op
    *          The select operator.
    * @return List<String> of the internal column names.
@@ -122,7 +122,7 @@ public class ColumnPrunerProcCtx impleme
 
   /**
    * Creates the list of internal column names for select * expressions.
-   * 
+   *
    * @param op
    *          The select operator.
    * @param colList

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
@@ -154,8 +154,8 @@ public final class ColumnPrunerProcFacto
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
-          cppCtx.genColLists((Operator<? extends Serializable>) nd));
+      cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
+          cppCtx.genColLists((Operator<? extends OperatorDesc>) nd));
 
       return null;
     }
@@ -180,8 +180,8 @@ public final class ColumnPrunerProcFacto
       TableScanOperator scanOp = (TableScanOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
       List<String> cols = cppCtx
-          .genColLists((Operator<? extends Serializable>) nd);
-      cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
+          .genColLists((Operator<? extends OperatorDesc>) nd);
+      cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
           cols);
       ArrayList<Integer> needed_columns = new ArrayList<Integer>();
       RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
@@ -241,13 +241,13 @@ public final class ColumnPrunerProcFacto
         Object... nodeOutputs) throws SemanticException {
       ReduceSinkOperator op = (ReduceSinkOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = cppCtx
+      HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap = cppCtx
           .getOpToParseCtxMap();
       RowResolver redSinkRR = opToParseCtxMap.get(op).getRowResolver();
       ReduceSinkDesc conf = op.getConf();
-      List<Operator<? extends Serializable>> childOperators = op
+      List<Operator<? extends OperatorDesc>> childOperators = op
           .getChildOperators();
-      List<Operator<? extends Serializable>> parentOperators = op
+      List<Operator<? extends OperatorDesc>> parentOperators = op
           .getParentOperators();
 
       List<String> colLists = new ArrayList<String>();
@@ -259,7 +259,7 @@ public final class ColumnPrunerProcFacto
       if ((childOperators.size() == 1)
           && (childOperators.get(0) instanceof JoinOperator)) {
         assert parentOperators.size() == 1;
-        Operator<? extends Serializable> par = parentOperators.get(0);
+        Operator<? extends OperatorDesc> par = parentOperators.get(0);
         JoinOperator childJoin = (JoinOperator) childOperators.get(0);
         RowResolver parRR = opToParseCtxMap.get(par).getRowResolver();
         List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(
@@ -405,7 +405,7 @@ public final class ColumnPrunerProcFacto
 
       LateralViewJoinOperator lvJoin = null;
       if (op.getChildOperators() != null) {
-        for (Operator<? extends Serializable> child : op.getChildOperators()) {
+        for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
           // If one of my children is a FileSink or Script, return all columns.
           // Without this break, a bug in ReduceSink to Extract edge column
           // pruning will manifest
@@ -490,14 +490,14 @@ public final class ColumnPrunerProcFacto
      */
     private void handleChildren(SelectOperator op,
         List<String> retainedSelOutputCols, ColumnPrunerProcCtx cppCtx) throws SemanticException {
-      for (Operator<? extends Serializable> child : op.getChildOperators()) {
+      for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
         if (child instanceof ReduceSinkOperator) {
           boolean[] flags = getPruneReduceSinkOpRetainFlags(
               retainedSelOutputCols, (ReduceSinkOperator) child);
           pruneReduceSinkOperator(flags, (ReduceSinkOperator) child, cppCtx);
         } else if (child instanceof FilterOperator) {
           // filter operator has the same output columns as its parent
-          for (Operator<? extends Serializable> filterChild : child
+          for (Operator<? extends OperatorDesc> filterChild : child
               .getChildOperators()) {
             if (filterChild instanceof ReduceSinkOperator) {
               boolean[] flags = getPruneReduceSinkOpRetainFlags(
@@ -647,7 +647,7 @@ public final class ColumnPrunerProcFacto
   }
 
   private static void pruneOperator(NodeProcessorCtx ctx,
-      Operator<? extends Serializable> op,
+      Operator<? extends OperatorDesc> op,
       List<String> cols)
       throws SemanticException {
     // the pruning needs to preserve the order of columns in the input schema
@@ -671,7 +671,7 @@ public final class ColumnPrunerProcFacto
    * @return
    * @throws SemanticException
    */
-  private static List<String> preserveColumnOrder(Operator<? extends Serializable> op,
+  private static List<String> preserveColumnOrder(Operator<? extends OperatorDesc> op,
       List<String> cols)
       throws SemanticException {
     RowSchema inputSchema = op.getSchema();
@@ -696,10 +696,10 @@ public final class ColumnPrunerProcFacto
       Map<Byte, List<Integer>> retainMap, boolean mapJoin) throws SemanticException {
     ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
     Map<Byte, List<String>> prunedColLists = new HashMap<Byte, List<String>>();
-    List<Operator<? extends Serializable>> childOperators = op
+    List<Operator<? extends OperatorDesc>> childOperators = op
         .getChildOperators();
 
-    for (Operator<? extends Serializable> child : childOperators) {
+    for (Operator<? extends OperatorDesc> child : childOperators) {
       if (child instanceof FileSinkOperator) {
         return;
       }
@@ -787,7 +787,7 @@ public final class ColumnPrunerProcFacto
 
     }
 
-    for (Operator<? extends Serializable> child : childOperators) {
+    for (Operator<? extends OperatorDesc> child : childOperators) {
       if (child instanceof ReduceSinkOperator) {
         boolean[] flags = getPruneReduceSinkOpRetainFlags(childColLists,
             (ReduceSinkOperator) child);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Aug 29 17:43:59 2012
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.plan.Lo
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -213,7 +214,7 @@ public class GenMRFileSink1 implements N
     }
 
     // create a dummy tableScan operator
-    Operator<? extends Serializable> tsMerge = OperatorFactory.get(
+    Operator<? extends OperatorDesc> tsMerge = OperatorFactory.get(
         TableScanDesc.class, inputRS);
 
     ArrayList<String> outputColumns = new ArrayList<String>();
@@ -335,7 +336,8 @@ public class GenMRFileSink1 implements N
 
     // Create a TableScan operator
     RowSchema inputRS = fsInput.getSchema();
-    Operator<? extends Serializable> tsMerge = OperatorFactory.get(TableScanDesc.class, inputRS);
+    Operator<? extends OperatorDesc> tsMerge =
+      OperatorFactory.get(TableScanDesc.class, inputRS);
 
     // Create a FileSink operator
     TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
@@ -510,7 +512,7 @@ public class GenMRFileSink1 implements N
    * @param parentFS the last FileSinkOperator in the parent MapReduce work
    * @return the MapredWork
    */
-  private MapredWork createMergeTask(HiveConf conf, Operator<? extends Serializable> topOp,
+  private MapredWork createMergeTask(HiveConf conf, Operator<? extends OperatorDesc> topOp,
       FileSinkDesc fsDesc) {
 
     ArrayList<String> aliases = new ArrayList<String>();
@@ -556,7 +558,7 @@ public class GenMRFileSink1 implements N
       work.setMapperCannotSpanPartns(true);
       work.setPathToAliases(pathToAliases);
       work.setAliasToWork(
-          new LinkedHashMap<String, Operator<? extends Serializable>>());
+          new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
       if (hasDynamicPartitions) {
         work.getPathToPartitionInfo().put(inputDir,
             new PartitionDesc(tblDesc, null));
@@ -696,11 +698,11 @@ public class GenMRFileSink1 implements N
       mvTask = findMoveTask(ctx.getMvTask(), fsOp);
     }
 
-    Operator<? extends Serializable> currTopOp = ctx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
     String currAliasId = ctx.getCurrAliasId();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
       ctx.getOpTaskMap();
-    List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
+    List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
     List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
 
     // Set the move task to be dependent on the current task

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.Map;
 import java.util.Stack;
 
@@ -28,6 +27,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Processor for the rule - no specific rule fired.
@@ -39,7 +39,7 @@ public class GenMROperator implements No
 
   /**
    * Reduce Scan encountered.
-   * 
+   *
    * @param nd
    *          the reduce sink operator encountered
    * @param procCtx
@@ -49,10 +49,10 @@ public class GenMROperator implements No
       Object... nodeOutputs) throws SemanticException {
     GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
-    mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+    mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
         mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx
         .getCurrAliasId()));
     return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Wed Aug 29 17:43:59 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 /**
@@ -54,7 +55,7 @@ public class GenMRProcContext implements
    */
   public static class GenMapRedCtx {
     Task<? extends Serializable> currTask;
-    Operator<? extends Serializable> currTopOp;
+    Operator<? extends OperatorDesc> currTopOp;
     String currAliasId;
 
     public GenMapRedCtx() {
@@ -69,7 +70,7 @@ public class GenMRProcContext implements
      *          the current alias for the to operator
      */
     public GenMapRedCtx(Task<? extends Serializable> currTask,
-        Operator<? extends Serializable> currTopOp, String currAliasId) {
+        Operator<? extends OperatorDesc> currTopOp, String currAliasId) {
       this.currTask = currTask;
       this.currTopOp = currTopOp;
       this.currAliasId = currAliasId;
@@ -85,7 +86,7 @@ public class GenMRProcContext implements
     /**
      * @return current top operator
      */
-    public Operator<? extends Serializable> getCurrTopOp() {
+    public Operator<? extends OperatorDesc> getCurrTopOp() {
       return currTopOp;
     }
 
@@ -105,13 +106,13 @@ public class GenMRProcContext implements
     Task<? extends Serializable> uTask;
     List<String> taskTmpDir;
     List<TableDesc> tt_desc;
-    List<Operator<? extends Serializable>> listTopOperators;
+    List<Operator<? extends OperatorDesc>> listTopOperators;
 
     public GenMRUnionCtx() {
       uTask = null;
       taskTmpDir = new ArrayList<String>();
       tt_desc = new ArrayList<TableDesc>();
-      listTopOperators = new ArrayList<Operator<? extends Serializable>>();
+      listTopOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     }
 
     public Task<? extends Serializable> getUTask() {
@@ -138,16 +139,16 @@ public class GenMRProcContext implements
       return tt_desc;
     }
 
-    public List<Operator<? extends Serializable>> getListTopOperators() {
+    public List<Operator<? extends OperatorDesc>> getListTopOperators() {
       return listTopOperators;
     }
 
     public void setListTopOperators(
-        List<Operator<? extends Serializable>> listTopOperators) {
+        List<Operator<? extends OperatorDesc>> listTopOperators) {
       this.listTopOperators = listTopOperators;
     }
 
-    public void addListTopOperators(Operator<? extends Serializable> topOperator) {
+    public void addListTopOperators(Operator<? extends OperatorDesc> topOperator) {
       listTopOperators.add(topOperator);
     }
   }
@@ -159,7 +160,7 @@ public class GenMRProcContext implements
   public static class GenMRMapJoinCtx {
     String taskTmpDir;
     TableDesc tt_desc;
-    Operator<? extends Serializable> rootMapJoinOp;
+    Operator<? extends OperatorDesc> rootMapJoinOp;
     AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin;
 
     public GenMRMapJoinCtx() {
@@ -176,7 +177,7 @@ public class GenMRProcContext implements
      * @param oldMapJoin
      */
     public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc,
-        Operator<? extends Serializable> rootMapJoinOp,
+        Operator<? extends OperatorDesc> rootMapJoinOp,
         AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
       this.taskTmpDir = taskTmpDir;
       this.tt_desc = tt_desc;
@@ -203,7 +204,7 @@ public class GenMRProcContext implements
     /**
      * @return the childSelect
      */
-    public Operator<? extends Serializable> getRootMapJoinOp() {
+    public Operator<? extends OperatorDesc> getRootMapJoinOp() {
       return rootMapJoinOp;
     }
 
@@ -211,7 +212,7 @@ public class GenMRProcContext implements
      * @param rootMapJoinOp
      *          the rootMapJoinOp to set
      */
-    public void setRootMapJoinOp(Operator<? extends Serializable> rootMapJoinOp) {
+    public void setRootMapJoinOp(Operator<? extends OperatorDesc> rootMapJoinOp) {
       this.rootMapJoinOp = rootMapJoinOp;
     }
 
@@ -232,23 +233,24 @@ public class GenMRProcContext implements
   }
 
   private HiveConf conf;
-  private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
+  private
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap;
   private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
   private HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx> mapJoinTaskMap;
-  private List<Operator<? extends Serializable>> seenOps;
+  private List<Operator<? extends OperatorDesc>> seenOps;
   private List<FileSinkOperator> seenFileSinkOps;
 
   private ParseContext parseCtx;
   private List<Task<MoveWork>> mvTask;
   private List<Task<? extends Serializable>> rootTasks;
 
-  private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx;
+  private LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx;
   private Task<? extends Serializable> currTask;
-  private Operator<? extends Serializable> currTopOp;
+  private Operator<? extends OperatorDesc> currTopOp;
   private UnionOperator currUnionOp;
   private AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp;
   private String currAliasId;
-  private List<Operator<? extends Serializable>> rootOps;
+  private List<Operator<? extends OperatorDesc>> rootOps;
   private DependencyCollectionTask dependencyTaskForMultiInsert;
 
   /**
@@ -287,11 +289,11 @@ public class GenMRProcContext implements
    */
   public GenMRProcContext(
       HiveConf conf,
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
-      List<Operator<? extends Serializable>> seenOps, ParseContext parseCtx,
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap,
+      List<Operator<? extends OperatorDesc>> seenOps, ParseContext parseCtx,
       List<Task<MoveWork>> mvTask,
       List<Task<? extends Serializable>> rootTasks,
-      LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
+      LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx,
       Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
     this.conf = conf;
     this.opTaskMap = opTaskMap;
@@ -307,7 +309,7 @@ public class GenMRProcContext implements
     currUnionOp = null;
     currMapJoinOp = null;
     currAliasId = null;
-    rootOps = new ArrayList<Operator<? extends Serializable>>();
+    rootOps = new ArrayList<Operator<? extends OperatorDesc>>();
     rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
     mapJoinTaskMap = new HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx>();
@@ -317,7 +319,8 @@ public class GenMRProcContext implements
   /**
    * @return reducer to task mapping
    */
-  public HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> getOpTaskMap() {
+  public HashMap<Operator<? extends OperatorDesc>,
+                 Task<? extends Serializable>> getOpTaskMap() {
     return opTaskMap;
   }
 
@@ -326,14 +329,14 @@ public class GenMRProcContext implements
    *          reducer to task mapping
    */
   public void setOpTaskMap(
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap) {
     this.opTaskMap = opTaskMap;
   }
 
   /**
    * @return operators already visited
    */
-  public List<Operator<? extends Serializable>> getSeenOps() {
+  public List<Operator<? extends OperatorDesc>> getSeenOps() {
     return seenOps;
   }
 
@@ -348,7 +351,7 @@ public class GenMRProcContext implements
    * @param seenOps
    *          operators already visited
    */
-  public void setSeenOps(List<Operator<? extends Serializable>> seenOps) {
+  public void setSeenOps(List<Operator<? extends OperatorDesc>> seenOps) {
     this.seenOps = seenOps;
   }
 
@@ -363,7 +366,7 @@ public class GenMRProcContext implements
   /**
    * @return top operators for tasks
    */
-  public List<Operator<? extends Serializable>> getRootOps() {
+  public List<Operator<? extends OperatorDesc>> getRootOps() {
     return rootOps;
   }
 
@@ -371,7 +374,7 @@ public class GenMRProcContext implements
    * @param rootOps
    *          top operators for tasks
    */
-  public void setRootOps(List<Operator<? extends Serializable>> rootOps) {
+  public void setRootOps(List<Operator<? extends OperatorDesc>> rootOps) {
     this.rootOps = rootOps;
   }
 
@@ -423,7 +426,7 @@ public class GenMRProcContext implements
   /**
    * @return operator to task mappings
    */
-  public LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> getMapCurrCtx() {
+  public LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> getMapCurrCtx() {
     return mapCurrCtx;
   }
 
@@ -432,7 +435,7 @@ public class GenMRProcContext implements
    *          operator to task mappings
    */
   public void setMapCurrCtx(
-      LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
+      LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx) {
     this.mapCurrCtx = mapCurrCtx;
   }
 
@@ -454,7 +457,7 @@ public class GenMRProcContext implements
   /**
    * @return current top operator
    */
-  public Operator<? extends Serializable> getCurrTopOp() {
+  public Operator<? extends OperatorDesc> getCurrTopOp() {
     return currTopOp;
   }
 
@@ -462,7 +465,7 @@ public class GenMRProcContext implements
    * @param currTopOp
    *          current top operator
    */
-  public void setCurrTopOp(Operator<? extends Serializable> currTopOp) {
+  public void setCurrTopOp(Operator<? extends OperatorDesc> currTopOp) {
     this.currTopOp = currTopOp;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Processor for the rule - table scan followed by reduce sink.
@@ -43,7 +44,7 @@ public class GenMRRedSink1 implements No
 
   /**
    * Reduce Scan encountered.
-   * 
+   *
    * @param nd
    *          the reduce sink operator encountered
    * @param opProcCtx
@@ -54,15 +55,15 @@ public class GenMRRedSink1 implements No
     ReduceSinkOperator op = (ReduceSinkOperator) nd;
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork currPlan = (MapredWork) currTask.getWork();
-    Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
         .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Wed Aug 29 17:43:59 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Processor for the rule - reduce sink followed by reduce sink.
@@ -41,7 +42,7 @@ public class GenMRRedSink2 implements No
 
   /**
    * Reduce Scan encountered.
-   * 
+   *
    * @param nd
    *          the reduce sink operator encountered
    * @param opProcCtx
@@ -52,14 +53,14 @@ public class GenMRRedSink2 implements No
     ReduceSinkOperator op = (ReduceSinkOperator) nd;
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-    Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
     String currAliasId = mapredCtx.getCurrAliasId();
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Map<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
         .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Processor for the rule - union followed by reduce sink.
@@ -56,8 +57,8 @@ public class GenMRRedSink3 implements No
 
     // union consisted on a bunch of map-reduce jobs, and it has been split at
     // the union
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(ctx.getCurrUnionOp());
 
@@ -70,7 +71,7 @@ public class GenMRRedSink3 implements No
 
 
     MapredWork plan = (MapredWork) unionTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
         .getOpTaskMap();
     Task<? extends Serializable> reducerTask = opTaskMap.get(reducer);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Processor for the rule - map join followed by reduce sink.
@@ -43,7 +44,7 @@ public class GenMRRedSink4 implements No
 
   /**
    * Reduce Scan encountered.
-   * 
+   *
    * @param nd
    *          the reduce sink operator encountered
    * @param opProcCtx
@@ -58,13 +59,13 @@ public class GenMRRedSink4 implements No
 
     // map-join consisted on a bunch of map-only jobs, and it has been split
     // after the mapjoin
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork plan = (MapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
         .getOpTaskMap();
     Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Wed Aug 29 17:43:59 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QBParseInfo;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 /**
  * Processor for the rule - table scan.
@@ -62,17 +63,17 @@ public class GenMRTableScan1 implements 
     TableScanOperator op = (TableScanOperator) nd;
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     ParseContext parseCtx = ctx.getParseCtx();
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
 
     // create a dummy MapReduce task
     MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx);
     Task<? extends Serializable> currTask = TaskFactory.get(currWork, parseCtx.getConf());
-    Operator<? extends Serializable> currTopOp = op;
+    Operator<? extends OperatorDesc> currTopOp = op;
     ctx.setCurrTask(currTask);
     ctx.setCurrTopOp(currTopOp);
 
     for (String alias : parseCtx.getTopOps().keySet()) {
-      Operator<? extends Serializable> currOp = parseCtx.getTopOps().get(alias);
+      Operator<? extends OperatorDesc> currOp = parseCtx.getTopOps().get(alias);
       if (currOp == op) {
         String currAliasId = alias;
         ctx.setCurrAliasId(currAliasId);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Aug 29 17:43:59 2012
@@ -39,13 +39,14 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -89,7 +90,7 @@ public class GenMRUnion1 implements Node
     }
     else {
       ctx.getMapCurrCtx().put(
-          (Operator<? extends Serializable>) union,
+          (Operator<? extends OperatorDesc>) union,
           new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
               ctx.getCurrAliasId()));
     }
@@ -127,8 +128,8 @@ public class GenMRUnion1 implements Node
    * @param uCtxTask
    */
   private void processSubQueryUnionCreateIntermediate(
-      Operator<? extends Serializable> parent,
-      Operator<? extends Serializable> child,
+      Operator<? extends OperatorDesc> parent,
+      Operator<? extends OperatorDesc> child,
       Task<? extends Serializable> uTask, GenMRProcContext ctx,
       GenMRUnionCtx uCtxTask) {
     ParseContext parseCtx = ctx.getParseCtx();
@@ -141,21 +142,23 @@ public class GenMRUnion1 implements Node
     String taskTmpDir = baseCtx.getMRTmpFileURI();
 
     // Create a file sink operator for this file name
-    Operator<? extends Serializable> fs_op = OperatorFactory.get(
+    Operator<? extends OperatorDesc> fs_op = OperatorFactory.get(
         new FileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
             HiveConf.ConfVars.COMPRESSINTERMEDIATE)), parent.getSchema());
 
     assert parent.getChildOperators().size() == 1;
     parent.getChildOperators().set(0, fs_op);
 
-    List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> parentOpList =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     parentOpList.add(parent);
     fs_op.setParentOperators(parentOpList);
 
     // Create a dummy table scan operator
-    Operator<? extends Serializable> ts_op = OperatorFactory.get(
+    Operator<? extends OperatorDesc> ts_op = OperatorFactory.get(
         new TableScanDesc(), parent.getSchema());
-    List<Operator<? extends Serializable>> childOpList = new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> childOpList =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     childOpList.add(child);
     ts_op.setChildOperators(childOpList);
     child.replaceParent(parent, ts_op);
@@ -199,8 +202,8 @@ public class GenMRUnion1 implements Node
     Task<? extends Serializable> uTask = uCtxTask.getUTask();
     MapredWork plan = (MapredWork) uTask.getWork();
     ctx.setCurrTask(uTask);
-    List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
-    Operator<? extends Serializable> topOp = ctx.getCurrTopOp();
+    List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
+    Operator<? extends OperatorDesc> topOp = ctx.getCurrTopOp();
     if (!seenOps.contains(topOp) && topOp != null) {
       seenOps.add(topOp);
       GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx
@@ -247,7 +250,7 @@ public class GenMRUnion1 implements Node
 
     // Map-only subqueries can be optimized in future to not write to a file in
     // future
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
 
     UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
 
@@ -305,7 +308,7 @@ public class GenMRUnion1 implements Node
 
     ctx.setCurrTask(uTask);
 
-    mapCurrCtx.put((Operator<? extends Serializable>) nd,
+    mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
         new GenMapRedCtx(ctx.getCurrTask(), null, null));
 
     return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Aug 29 17:43:59 2012
@@ -61,15 +61,16 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 
 /**
  * General utility common functions for the Processor to convert operator into
@@ -92,14 +93,15 @@ public final class GenMapRedUtils {
    */
   public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
       throws SemanticException {
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+      opProcCtx.getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork plan = (MapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
       opProcCtx.getOpTaskMap();
-    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     opTaskMap.put(reducer, currTask);
     plan.setReducer(reducer);
@@ -117,7 +119,7 @@ public final class GenMapRedUtils {
     }
 
     assert currTopOp != null;
-    List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+    List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
     String currAliasId = opProcCtx.getCurrAliasId();
 
     if (!seenOps.contains(currTopOp)) {
@@ -134,8 +136,9 @@ public final class GenMapRedUtils {
   }
 
   public static void initMapJoinPlan(
-      Operator<? extends Serializable> op, GenMRProcContext ctx,
-      boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos) throws SemanticException {
+    Operator<? extends OperatorDesc> op, GenMRProcContext ctx,
+    boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos)
+    throws SemanticException {
     initMapJoinPlan(op, ctx, readInputMapJoin, readInputUnion, setReducer, pos, false);
   }
 
@@ -149,20 +152,21 @@ public final class GenMapRedUtils {
    * @param pos
    *          position of the parent
    */
-  public static void initMapJoinPlan(Operator<? extends Serializable> op,
+  public static void initMapJoinPlan(Operator<? extends OperatorDesc> op,
       GenMRProcContext opProcCtx, boolean readInputMapJoin,
       boolean readInputUnion, boolean setReducer, int pos, boolean createLocalPlan)
       throws SemanticException {
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+      opProcCtx.getMapCurrCtx();
     assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
     int parentPos = (pos == -1) ? 0 : pos;
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
         parentPos));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork plan = (MapredWork) currTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>>  opTaskMap =
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>  opTaskMap =
       opProcCtx.getOpTaskMap();
-    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     // The mapjoin has already been encountered. Some context must be stored
     // about that
@@ -173,7 +177,7 @@ public final class GenMapRedUtils {
           false : true;
 
       if (setReducer) {
-        Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+        Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
         plan.setReducer(reducer);
         opTaskMap.put(reducer, currTask);
         if (reducer.getClass() == JoinOperator.class) {
@@ -189,7 +193,7 @@ public final class GenMapRedUtils {
         GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
         String taskTmpDir;
         TableDesc tt_desc;
-        Operator<? extends Serializable> rootOp;
+        Operator<? extends OperatorDesc> rootOp;
 
         if (mjCtx.getOldMapJoin() == null || setReducer) {
           taskTmpDir = mjCtx.getTaskTmpDir();
@@ -222,7 +226,7 @@ public final class GenMapRedUtils {
       }
 
       assert currTopOp != null;
-      List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
       String currAliasId = opProcCtx.getCurrAliasId();
 
       seenOps.add(currTopOp);
@@ -249,7 +253,7 @@ public final class GenMapRedUtils {
           }
           if (localPlan == null && createLocalPlan) {
             localPlan = new MapredLocalWork(
-                new LinkedHashMap<String, Operator<? extends Serializable>>(),
+                new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
                 new LinkedHashMap<String, FetchWork>());
           }
         } else {
@@ -298,10 +302,10 @@ public final class GenMapRedUtils {
   public static void initUnionPlan(ReduceSinkOperator op,
       GenMRProcContext opProcCtx,
       Task<? extends Serializable> unionTask) throws SemanticException {
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
 
     MapredWork plan = (MapredWork) unionTask.getWork();
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
       opProcCtx.getOpTaskMap();
 
     opTaskMap.put(reducer, unionTask);
@@ -320,10 +324,10 @@ public final class GenMapRedUtils {
   private static void setUnionPlan(GenMRProcContext opProcCtx,
       boolean local, MapredWork plan, GenMRUnionCtx uCtx,
       boolean mergeTask) throws SemanticException {
-    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     if (currTopOp != null) {
-      List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
       String currAliasId = opProcCtx.getCurrAliasId();
       if (!seenOps.contains(currTopOp) || mergeTask) {
         seenOps.add(currTopOp);
@@ -340,7 +344,7 @@ public final class GenMapRedUtils {
         int size = taskTmpDirLst.size();
         assert local == false;
 
-        List<Operator<? extends Serializable>> topOperators =
+        List<Operator<? extends OperatorDesc>> topOperators =
             uCtx.getListTopOperators();
 
         for (int pos = 0; pos < size; pos++) {
@@ -422,7 +426,7 @@ public final class GenMapRedUtils {
     opProcCtx.setCurrTask(existingTask);
   }
 
-  public static void joinPlan(Operator<? extends Serializable> op,
+  public static void joinPlan(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
       GenMRProcContext opProcCtx, int pos, boolean split,
       boolean readMapJoinData, boolean readUnionData) throws SemanticException {
@@ -443,14 +447,14 @@ public final class GenMapRedUtils {
    * @param pos
    *          position of the parent in the stack
    */
-  public static void joinPlan(Operator<? extends Serializable> op,
+  public static void joinPlan(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
       GenMRProcContext opProcCtx, int pos, boolean split,
       boolean readMapJoinData, boolean readUnionData, boolean createLocalWork)
       throws SemanticException {
     Task<? extends Serializable> currTask = task;
     MapredWork plan = (MapredWork) currTask.getWork();
-    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
     List<Task<? extends Serializable>> parTasks = null;
 
     // terminate the old task and make current task dependent on it
@@ -471,7 +475,7 @@ public final class GenMapRedUtils {
     }
 
     if (currTopOp != null) {
-      List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
       String currAliasId = opProcCtx.getCurrAliasId();
 
       if (!seenOps.contains(currTopOp)) {
@@ -500,7 +504,7 @@ public final class GenMapRedUtils {
         AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = mjCtx.getOldMapJoin();
         String taskTmpDir = null;
         TableDesc tt_desc = null;
-        Operator<? extends Serializable> rootOp = null;
+        Operator<? extends OperatorDesc> rootOp = null;
 
         boolean local = ((pos == -1) || (pos == (mjOp.getConf())
             .getPosBigTable())) ? false : true;
@@ -552,7 +556,7 @@ public final class GenMapRedUtils {
     MapredWork cplan = getMapRedWork(parseCtx);
     Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
         .getConf());
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
 
     // Add the reducer
     cplan.setReducer(reducer);
@@ -560,7 +564,7 @@ public final class GenMapRedUtils {
 
     cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
 
-    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
       opProcCtx.getOpTaskMap();
     opTaskMap.put(reducer, redTask);
     Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
@@ -584,7 +588,7 @@ public final class GenMapRedUtils {
    *          processing context
    */
   public static void setTaskPlan(String alias_id,
-      Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
       GenMRProcContext opProcCtx) throws SemanticException {
     setTaskPlan(alias_id, topOp, plan, local, opProcCtx, null);
   }
@@ -606,7 +610,7 @@ public final class GenMapRedUtils {
    *          pruned partition list. If it is null it will be computed on-the-fly.
    */
   public static void setTaskPlan(String alias_id,
-      Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
       GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
     ParseContext parseCtx = opProcCtx.getParseCtx();
     Set<ReadEntity> inputs = opProcCtx.getInputs();
@@ -810,7 +814,7 @@ public final class GenMapRedUtils {
       MapredLocalWork localPlan = plan.getMapLocalWork();
       if (localPlan == null) {
         localPlan = new MapredLocalWork(
-            new LinkedHashMap<String, Operator<? extends Serializable>>(),
+            new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
             new LinkedHashMap<String, FetchWork>());
       }
 
@@ -845,7 +849,7 @@ public final class GenMapRedUtils {
    *          table descriptor
    */
   public static void setTaskPlan(String path, String alias,
-      Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
       TableDesc tt_desc) throws SemanticException {
 
     if(path == null || alias == null) {
@@ -864,7 +868,7 @@ public final class GenMapRedUtils {
       MapredLocalWork localPlan = plan.getMapLocalWork();
       if (localPlan == null) {
         localPlan = new MapredLocalWork(
-            new LinkedHashMap<String, Operator<? extends Serializable>>(),
+            new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
             new LinkedHashMap<String, FetchWork>());
       }
 
@@ -885,7 +889,7 @@ public final class GenMapRedUtils {
    *          current top operator in the path
    */
   public static void setKeyAndValueDesc(MapredWork plan,
-      Operator<? extends Serializable> topOp) {
+      Operator<? extends OperatorDesc> topOp) {
     if (topOp == null) {
       return;
     }
@@ -900,9 +904,9 @@ public final class GenMapRedUtils {
       }
       tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
     } else {
-      List<Operator<? extends Serializable>> children = topOp.getChildOperators();
+      List<Operator<? extends OperatorDesc>> children = topOp.getChildOperators();
       if (children != null) {
-        for (Operator<? extends Serializable> op : children) {
+        for (Operator<? extends OperatorDesc> op : children) {
           setKeyAndValueDesc(plan, op);
         }
       }
@@ -935,7 +939,7 @@ public final class GenMapRedUtils {
     work.setMapperCannotSpanPartns(mapperCannotSpanPartns);
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
-    work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+    work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
     work.setTagToValueDesc(new ArrayList<TableDesc>());
     work.setReducer(null);
     work.setHadoopSupportsSplittable(
@@ -954,8 +958,8 @@ public final class GenMapRedUtils {
    *          parse context
    */
   @SuppressWarnings("nls")
-  private static Operator<? extends Serializable> putOpInsertMap(
-      Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx) {
+  public static Operator<? extends OperatorDesc> putOpInsertMap(
+      Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext parseCtx) {
     OpParseContext ctx = new OpParseContext(rr);
     parseCtx.getOpParseCtx().put(op, ctx);
     return op;
@@ -971,12 +975,12 @@ public final class GenMapRedUtils {
    * @param setReducer does the reducer needs to be set
    * @param pos position of the parent
    **/
-  public static void splitTasks(Operator<? extends Serializable> op,
+  public static void splitTasks(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> parentTask,
       Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
       boolean setReducer, boolean local, int posn) throws SemanticException {
     childTask.getWork();
-    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     ParseContext parseCtx = opProcCtx.getParseCtx();
     parentTask.addDependentTask(childTask);
@@ -992,7 +996,7 @@ public final class GenMapRedUtils {
     Context baseCtx = parseCtx.getContext();
     String taskTmpDir = baseCtx.getMRTmpFileURI();
 
-    Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
+    Operator<? extends OperatorDesc> parent = op.getParentOperators().get(posn);
     TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
         .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
 
@@ -1007,11 +1011,11 @@ public final class GenMapRedUtils {
       desc.setCompressType(parseCtx.getConf().getVar(
           HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
     }
-    Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
+    Operator<? extends OperatorDesc> fs_op = putOpInsertMap(OperatorFactory
         .get(desc, parent.getSchema()), null, parseCtx);
 
     // replace the reduce child with this operator
-    List<Operator<? extends Serializable>> childOpList = parent
+    List<Operator<? extends OperatorDesc>> childOpList = parent
     .getChildOperators();
     for (int pos = 0; pos < childOpList.size(); pos++) {
       if (childOpList.get(pos) == op) {
@@ -1020,30 +1024,31 @@ public final class GenMapRedUtils {
       }
     }
 
-    List<Operator<? extends Serializable>> parentOpList =
-      new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> parentOpList =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     parentOpList.add(parent);
     fs_op.setParentOperators(parentOpList);
 
     // create a dummy tableScan operator on top of op
     // TableScanOperator is implicitly created here for each MapOperator
     RowResolver rowResolver = opProcCtx.getParseCtx().getOpParseCtx().get(parent).getRowResolver();
-    Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
+    Operator<? extends OperatorDesc> ts_op = putOpInsertMap(OperatorFactory
         .get(TableScanDesc.class, parent.getSchema()), rowResolver, parseCtx);
 
-    childOpList = new ArrayList<Operator<? extends Serializable>>();
+    childOpList = new ArrayList<Operator<? extends OperatorDesc>>();
     childOpList.add(op);
     ts_op.setChildOperators(childOpList);
     op.getParentOperators().set(posn, ts_op);
 
-    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+      opProcCtx.getMapCurrCtx();
     mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
 
     String streamDesc = taskTmpDir;
     MapredWork cplan = (MapredWork) childTask.getWork();
 
     if (setReducer) {
-      Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+      Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
 
       if (reducer.getClass() == JoinOperator.class) {
         String origStreamDesc;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,9 +37,7 @@ import org.apache.hadoop.hive.ql.parse.Q
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-
-import java.io.Serializable;
-import java.util.Map;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * This optimizer is used to reduce the input size for the query for queries which are
@@ -58,7 +58,7 @@ public class GlobalLimitOptimizer implem
 
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     Context ctx = pctx.getContext();
-    Map<String, Operator<? extends Serializable>> topOps = pctx.getTopOps();
+    Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
     GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx();
     Map<TableScanOperator, ExprNodeDesc> opToPartPruner = pctx.getOpToPartPruner();
     Map<TableScanOperator, PrunedPartitionList> opToPartList = pctx.getOpToPartList();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 
 /**
@@ -175,7 +175,7 @@ public class GroupByOptimizer implements
       }
 
       for (String table : tblNames) {
-        Operator<? extends Serializable> topOp = pGraphContext.getTopOps().get(
+        Operator<? extends OperatorDesc> topOp = pGraphContext.getTopOps().get(
             table);
         if (topOp == null || (!(topOp instanceof TableScanOperator))) {
           // this is in a sub-query.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -29,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Implementation of rule-based join table reordering optimization. User passes
@@ -42,7 +42,7 @@ public class JoinReorder implements Tran
    * the whole tree is traversed. Possible sizes: 0: the operator and its
    * subtree don't contain any big tables 1: the subtree of the operator
    * contains a big table 2: the operator is a big table
-   * 
+   *
    * @param operator
    *          The operator which output size is to be estimated
    * @param bigTables
@@ -50,12 +50,12 @@ public class JoinReorder implements Tran
    * @return The estimated size - 0 (no streamed tables), 1 (streamed tables in
    *         subtree) or 2 (a streamed table)
    */
-  private int getOutputSize(Operator<? extends Serializable> operator,
+  private int getOutputSize(Operator<? extends OperatorDesc> operator,
       Set<String> bigTables) {
     // If a join operator contains a big subtree, there is a chance that its
     // output is also big, so the output size is 1 (medium)
     if (operator instanceof JoinOperator) {
-      for (Operator<? extends Serializable> o : operator.getParentOperators()) {
+      for (Operator<? extends OperatorDesc> o : operator.getParentOperators()) {
         if (getOutputSize(o, bigTables) != 0) {
           return 1;
         }
@@ -74,7 +74,7 @@ public class JoinReorder implements Tran
     // the biggest output from a parent
     int maxSize = 0;
     if (operator.getParentOperators() != null) {
-      for (Operator<? extends Serializable> o : operator.getParentOperators()) {
+      for (Operator<? extends OperatorDesc> o : operator.getParentOperators()) {
         int current = getOutputSize(o, bigTables);
         if (current > maxSize) {
           maxSize = current;
@@ -87,7 +87,7 @@ public class JoinReorder implements Tran
 
   /**
    * Find all big tables from STREAMTABLE hints.
-   * 
+   *
    * @param joinCtx
    *          The join context
    * @return Set of all big tables
@@ -107,7 +107,7 @@ public class JoinReorder implements Tran
   /**
    * Reorder the tables in a join operator appropriately (by reordering the tags
    * of the reduces sinks).
-   * 
+   *
    * @param joinOp
    *          The join operator to be processed
    * @param bigTables
@@ -148,7 +148,7 @@ public class JoinReorder implements Tran
    * Transform the query tree. For each join, check which reduce sink will
    * output the biggest result (based on STREAMTABLE hints) and give it the
    * biggest tag so that it gets streamed.
-   * 
+   *
    * @param pactx
    *          current parse context
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Aug 29 17:43:59 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
@@ -57,9 +58,9 @@ public final class MapJoinFactory {
     int pos = 0;
     int size = stack.size();
     assert size >= 2 && stack.get(size - 1) == op;
-    Operator<? extends Serializable> parent = (Operator<? extends Serializable>) stack
-        .get(size - 2);
-    List<Operator<? extends Serializable>> parOp = op.getParentOperators();
+    Operator<? extends OperatorDesc> parent =
+      (Operator<? extends OperatorDesc>) stack.get(size - 2);
+    List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
     pos = parOp.indexOf(parent);
     assert pos < parOp.size();
     return pos;
@@ -72,24 +73,24 @@ public final class MapJoinFactory {
 
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
+      Object... nodeOutputs) throws SemanticException {
       AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
           .getMapCurrCtx();
       GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
           pos));
       Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       MapredWork currPlan = (MapredWork) currTask.getWork();
-      Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+      Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
       String currAliasId = mapredCtx.getCurrAliasId();
-      Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
-          .getOpTaskMap();
+      Operator<? extends OperatorDesc> reducer = mapJoin;
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+        ctx.getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
       ctx.setCurrTopOp(currTopOp);
@@ -138,11 +139,11 @@ public final class MapJoinFactory {
           : true;
 
       GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
-          local, pos);
+        local, pos);
 
       currTask = opProcCtx.getCurrTask();
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
-          .getOpTaskMap();
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+        opProcCtx.getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
 
       // If the plan for this reducer does not exist, initialize the plan
@@ -195,9 +196,9 @@ public final class MapJoinFactory {
       if (listMapJoinOps.contains(mapJoin)) {
         ctx.setCurrAliasId(null);
         ctx.setCurrTopOp(null);
-        Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+        Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
             .getMapCurrCtx();
-        mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+        mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
             ctx.getCurrTask(), null, null));
         return null;
       }
@@ -230,14 +231,15 @@ public final class MapJoinFactory {
       sel.setParentOperators(null);
 
       // Create a file sink operator for this file name
-      Operator<? extends Serializable> fs_op = OperatorFactory.get(
+      Operator<? extends OperatorDesc> fs_op = OperatorFactory.get(
           new FileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
           HiveConf.ConfVars.COMPRESSINTERMEDIATE)), mapJoin.getSchema());
 
       assert mapJoin.getChildOperators().size() == 1;
       mapJoin.getChildOperators().set(0, fs_op);
 
-      List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+      List<Operator<? extends OperatorDesc>> parentOpList =
+        new ArrayList<Operator<? extends OperatorDesc>>();
       parentOpList.add(mapJoin);
       fs_op.setParentOperators(parentOpList);
 
@@ -247,9 +249,9 @@ public final class MapJoinFactory {
       ctx.setCurrAliasId(null);
       ctx.setCurrTopOp(null);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
           .getMapCurrCtx();
-      mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+      mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
           ctx.getCurrTask(), null, null));
 
       return null;
@@ -263,8 +265,9 @@ public final class MapJoinFactory {
 
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
+      Object... nodeOutputs) throws SemanticException {
+      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin =
+        (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       ctx.getParseCtx();
@@ -282,16 +285,16 @@ public final class MapJoinFactory {
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
           .getMapCurrCtx();
       GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
           pos));
       Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       MapredWork currPlan = (MapredWork) currTask.getWork();
       mapredCtx.getCurrAliasId();
-      Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
-          .getOpTaskMap();
+      Operator<? extends OperatorDesc> reducer = mapJoin;
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+        ctx.getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
       ctx.setCurrTask(currTask);
@@ -321,7 +324,7 @@ public final class MapJoinFactory {
 
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
+      Object... nodeOutputs) throws SemanticException {
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       ParseContext parseCtx = ctx.getParseCtx();
@@ -341,15 +344,15 @@ public final class MapJoinFactory {
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
-      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
           .getMapCurrCtx();
       GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
           pos));
       Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       MapredWork currPlan = (MapredWork) currTask.getWork();
-      Operator<? extends Serializable> reducer = mapJoin;
-      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
-          .getOpTaskMap();
+      Operator<? extends OperatorDesc> reducer = mapJoin;
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+        ctx.getOpTaskMap();
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
       // union result cannot be a map table

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -69,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.Jo
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -97,8 +97,8 @@ public class MapJoinProcessor implements
   }
 
   @SuppressWarnings("nls")
-  private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op,
-      RowResolver rr) {
+  private Operator<? extends OperatorDesc>
+    putOpInsertMap(Operator<? extends OperatorDesc> op, RowResolver rr) {
     OpParseContext ctx = new OpParseContext(rr);
     pGraphContext.getOpParseCtx().put(op, ctx);
     return op;
@@ -120,18 +120,18 @@ public class MapJoinProcessor implements
 
     // create a new  MapredLocalWork
     MapredLocalWork newLocalWork = new MapredLocalWork(
-        new LinkedHashMap<String, Operator<? extends Serializable>>(),
+        new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
         new LinkedHashMap<String, FetchWork>());
 
-    for (Map.Entry<String, Operator<? extends Serializable>> entry : newWork.getAliasToWork()
-        .entrySet()) {
+    for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
+      newWork.getAliasToWork().entrySet()) {
       String alias = entry.getKey();
-      Operator<? extends Serializable> op = entry.getValue();
+      Operator<? extends OperatorDesc> op = entry.getValue();
 
       // if the table scan is for big table; then skip it
       // tracing down the operator tree from the table scan operator
-      Operator<? extends Serializable> parentOp = op;
-      Operator<? extends Serializable> childOp = op.getChildOperators().get(0);
+      Operator<? extends OperatorDesc> parentOp = op;
+      Operator<? extends OperatorDesc> childOp = op.getChildOperators().get(0);
       while ((childOp != null) && (!childOp.equals(mapJoinOp))) {
         parentOp = childOp;
         assert parentOp.getChildOperators().size() == 1;
@@ -218,10 +218,10 @@ public class MapJoinProcessor implements
   }
 
   public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
-      throws SemanticException {
+    throws SemanticException {
     try {
-      LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = newWork
-          .getOpParseCtxMap();
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
+        newWork.getOpParseCtxMap();
       QBJoinTree newJoinTree = newWork.getJoinTree();
       // generate the map join operator; already checked the map join
       MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
@@ -256,9 +256,9 @@ public class MapJoinProcessor implements
    * @param noCheckOuterJoin
    */
   public static MapJoinOperator convertMapJoin(
-      LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap,
-      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
-      throws SemanticException {
+    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+    JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
+    throws SemanticException {
     // outer join cannot be performed on a table which is being cached
     JoinDesc desc = op.getConf();
     JoinCondDesc[] condns = desc.getConds();
@@ -279,18 +279,22 @@ public class MapJoinProcessor implements
     // The join outputs a concatenation of all the inputs.
     QBJoinTree leftSrc = joinTree.getJoinSrc();
 
-    List<Operator<? extends Serializable>> parentOps = op.getParentOperators();
-    List<Operator<? extends Serializable>> newParentOps = new ArrayList<Operator<? extends Serializable>>();
-    List<Operator<? extends Serializable>> oldReduceSinkParentOps = new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> parentOps = op.getParentOperators();
+    List<Operator<? extends OperatorDesc>> newParentOps =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
+       new ArrayList<Operator<? extends OperatorDesc>>();
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    HashMap<Byte, HashMap<String, ExprNodeDesc>> columnTransfer = new HashMap<Byte, HashMap<String, ExprNodeDesc>>();
+    HashMap<Byte, HashMap<String, ExprNodeDesc>> columnTransfer =
+      new HashMap<Byte, HashMap<String, ExprNodeDesc>>();
 
     // found a source which is not to be stored in memory
     if (leftSrc != null) {
       // assert mapJoinPos == 0;
-      Operator<? extends Serializable> parentOp = parentOps.get(0);
+      Operator<? extends OperatorDesc> parentOp = parentOps.get(0);
       assert parentOp.getParentOperators().size() == 1;
-      Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+      Operator<? extends OperatorDesc> grandParentOp =
+        parentOp.getParentOperators().get(0);
       oldReduceSinkParentOps.add(parentOp);
       grandParentOp.removeChild(parentOp);
       newParentOps.add(grandParentOp);
@@ -300,9 +304,10 @@ public class MapJoinProcessor implements
     // Remove parent reduce-sink operators
     for (String src : joinTree.getBaseSrc()) {
       if (src != null) {
-        Operator<? extends Serializable> parentOp = parentOps.get(pos);
+        Operator<? extends OperatorDesc> parentOp = parentOps.get(pos);
         assert parentOp.getParentOperators().size() == 1;
-        Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+        Operator<? extends OperatorDesc> grandParentOp =
+          parentOp.getParentOperators().get(0);
 
         grandParentOp.removeChild(parentOp);
         oldReduceSinkParentOps.add(parentOp);
@@ -389,7 +394,7 @@ public class MapJoinProcessor implements
 
     Operator[] newPar = new Operator[newParentOps.size()];
     pos = 0;
-    for (Operator<? extends Serializable> o : newParentOps) {
+    for (Operator<? extends OperatorDesc> o : newParentOps) {
       newPar[pos++] = o;
     }
 
@@ -461,8 +466,8 @@ public class MapJoinProcessor implements
 
     // change the children of the original join operator to point to the map
     // join operator
-    List<Operator<? extends Serializable>> childOps = op.getChildOperators();
-    for (Operator<? extends Serializable> childOp : childOps) {
+    List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
+    for (Operator<? extends OperatorDesc> childOp : childOps) {
       childOp.replaceParent(op, mapJoinOp);
     }
 
@@ -482,7 +487,7 @@ public class MapJoinProcessor implements
         && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
 
 
-    LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = pctx
+    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
         .getOpParseCtx();
     MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
         noCheckOuterJoin);
@@ -577,7 +582,7 @@ public class MapJoinProcessor implements
   }
 
   private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws SemanticException {
-    List<Operator<? extends Serializable>> childOps = input.getChildOperators();
+    List<Operator<? extends OperatorDesc>> childOps = input.getChildOperators();
     input.setChildOperators(null);
 
     // create a dummy select - This select is needed by the walker to split the
@@ -613,7 +618,7 @@ public class MapJoinProcessor implements
 
     // Insert the select operator in between.
     sel.setChildOperators(childOps);
-    for (Operator<? extends Serializable> ch : childOps) {
+    for (Operator<? extends OperatorDesc> ch : childOps) {
       ch.replaceParent(input, sel);
     }
   }
@@ -764,12 +769,12 @@ public class MapJoinProcessor implements
     }
 
     private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOperator mapJoin) {
-      Operator<? extends Serializable> parent = mapJoin;
+      Operator<? extends OperatorDesc> parent = mapJoin;
       while (true) {
         if (parent.getChildOperators() == null || parent.getChildOperators().size() != 1) {
           return null;
         }
-        Operator<? extends Serializable> ch = parent.getChildOperators().get(0);
+        Operator<? extends OperatorDesc> ch = parent.getChildOperators().get(0);
         if (ch instanceof MapJoinOperator) {
           if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) {
             if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf()