You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/08/29 19:44:02 UTC

svn commit: r1378659 [1/4] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/index/compact/ java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/lib/ java/org/apach...

Author: namit
Date: Wed Aug 29 17:43:59 2012
New Revision: 1378659

URL: http://svn.apache.org/viewvc?rev=1378659&view=rev
Log:
HIVE-3410 All operators's conf should inherit from a common class
(Namit via Carl)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpWalkerCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrintOpTreeProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CollectDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExtractDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ForwardDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewForwardDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Aug 29 17:43:59 2012
@@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -509,7 +510,7 @@ public class Driver implements CommandPr
   }
 
   private void doAuthorization(BaseSemanticAnalyzer sem)
-      throws HiveException, AuthorizationException {
+    throws HiveException, AuthorizationException {
     HashSet<ReadEntity> inputs = sem.getInputs();
     HashSet<WriteEntity> outputs = sem.getOutputs();
     SessionState ss = SessionState.get();
@@ -583,9 +584,9 @@ public class Driver implements CommandPr
         ParseContext parseCtx = querySem.getParseContext();
         Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
 
-        for (Map.Entry<String, Operator<? extends Serializable>> topOpMap : querySem
+        for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem
             .getParseContext().getTopOps().entrySet()) {
-          Operator<? extends Serializable> topOp = topOpMap.getValue();
+          Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
           if (topOp instanceof TableScanOperator
               && tsoTopMap.containsKey(topOp)) {
             TableScanOperator tableScanOp = (TableScanOperator) topOp;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Aug 29 17:43:59 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.hooks.L
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
 import org.apache.hadoop.hive.ql.plan.api.NodeType;
 import org.apache.hadoop.hive.ql.plan.api.TaskType;
@@ -152,18 +153,18 @@ public class QueryPlan implements Serial
    */
   private void populateOperatorGraph(
       org.apache.hadoop.hive.ql.plan.api.Task task,
-      Collection<Operator<? extends Serializable>> topOps) {
+      Collection<Operator<? extends OperatorDesc>> topOps) {
 
     task.setOperatorGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
     task.getOperatorGraph().setNodeType(NodeType.OPERATOR);
 
-    Queue<Operator<? extends Serializable>> opsToVisit =
-      new LinkedList<Operator<? extends Serializable>>();
-    Set<Operator<? extends Serializable>> opsVisited =
-      new HashSet<Operator<? extends Serializable>>();
+    Queue<Operator<? extends OperatorDesc>> opsToVisit =
+      new LinkedList<Operator<? extends OperatorDesc>>();
+    Set<Operator<? extends OperatorDesc>> opsVisited =
+      new HashSet<Operator<? extends OperatorDesc>>();
     opsToVisit.addAll(topOps);
     while (opsToVisit.peek() != null) {
-      Operator<? extends Serializable> op = opsToVisit.remove();
+      Operator<? extends OperatorDesc> op = opsToVisit.remove();
       opsVisited.add(op);
       // populate the operator
       org.apache.hadoop.hive.ql.plan.api.Operator operator =
@@ -177,7 +178,7 @@ public class QueryPlan implements Serial
           new org.apache.hadoop.hive.ql.plan.api.Adjacency();
         entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
         entry.setNode(op.getOperatorId());
-        for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
+        for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
           entry.addToChildren(childOp.getOperatorId());
           if (!opsVisited.contains(childOp)) {
             opsToVisit.add(childOp);
@@ -230,8 +231,8 @@ public class QueryPlan implements Serial
           reduceTask.setTaskId(stage.getStageId() + "_REDUCE");
           reduceTask.setTaskType(TaskType.REDUCE);
           stage.addToTaskList(reduceTask);
-          Collection<Operator<? extends Serializable>> reducerTopOps =
-            new ArrayList<Operator<? extends Serializable>>();
+          Collection<Operator<? extends OperatorDesc>> reducerTopOps =
+            new ArrayList<Operator<? extends OperatorDesc>>();
           reducerTopOps.add(mrTask.getWork().getReducer());
           populateOperatorGraph(reduceTask, reducerTopOps);
         }
@@ -309,8 +310,11 @@ public class QueryPlan implements Serial
           } else {
             task.setStarted(started.contains(task.getTaskId()));
             task.setDone(done.contains(task.getTaskId()));
-            for (org.apache.hadoop.hive.ql.plan.api.Operator op : task
-                .getOperatorList()) {
+            if (task.getOperatorList() == null) {
+              return;
+            }
+            for (org.apache.hadoop.hive.ql.plan.api.Operator op :
+              task.getOperatorList()) {
               // if the task has started, all operators within the task have
               // started
               op.setStarted(started.contains(task.getTaskId()));
@@ -370,8 +374,8 @@ public class QueryPlan implements Serial
           done.add(task.getId() + "_MAP");
         }
         if (mrTask.hasReduce()) {
-          Collection<Operator<? extends Serializable>> reducerTopOps =
-            new ArrayList<Operator<? extends Serializable>>();
+          Collection<Operator<? extends OperatorDesc>> reducerTopOps =
+            new ArrayList<Operator<? extends OperatorDesc>>();
           reducerTopOps.add(mrTask.getWork().getReducer());
           extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
           if (mrTask.reduceStarted()) {
@@ -393,21 +397,21 @@ public class QueryPlan implements Serial
   }
 
   private void extractOperatorCounters(
-      Collection<Operator<? extends Serializable>> topOps, String taskId) {
-    Queue<Operator<? extends Serializable>> opsToVisit =
-      new LinkedList<Operator<? extends Serializable>>();
-    Set<Operator<? extends Serializable>> opsVisited =
-      new HashSet<Operator<? extends Serializable>>();
+      Collection<Operator<? extends OperatorDesc>> topOps, String taskId) {
+    Queue<Operator<? extends OperatorDesc>> opsToVisit =
+      new LinkedList<Operator<? extends OperatorDesc>>();
+    Set<Operator<? extends OperatorDesc>> opsVisited =
+      new HashSet<Operator<? extends OperatorDesc>>();
     opsToVisit.addAll(topOps);
     while (opsToVisit.size() != 0) {
-      Operator<? extends Serializable> op = opsToVisit.remove();
+      Operator<? extends OperatorDesc> op = opsToVisit.remove();
       opsVisited.add(op);
       counters.put(op.getOperatorId(), op.getCounters());
       if (op.getDone()) {
         done.add(op.getOperatorId());
       }
       if (op.getChildOperators() != null) {
-        for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
+        for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
           if (!opsVisited.contains(childOp)) {
             opsToVisit.add(childOp);
           }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Aug 29 17:43:59 2012
@@ -50,9 +50,9 @@ import org.apache.hadoop.hive.common.Com
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
@@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Fe
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -178,7 +179,7 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false otherwise.
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
       if (op.checkFatalErrors(ctrs, errMsg)) {
         return true;
       }
@@ -195,7 +196,8 @@ public class ExecDriver extends Task<Map
     // fix up outputs
     Map<String, ArrayList<String>> pa = work.getPathToAliases();
     if (pa != null) {
-      ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+      List<Operator<? extends OperatorDesc>> opList =
+        new ArrayList<Operator<? extends OperatorDesc>>();
 
       if (work.getReducer() != null) {
         opList.add(work.getReducer());
@@ -206,7 +208,7 @@ public class ExecDriver extends Task<Map
           opList.add(work.getAliasToWork().get(a));
 
           while (!opList.isEmpty()) {
-            Operator<? extends Serializable> op = opList.remove(0);
+            Operator<? extends OperatorDesc> op = opList.remove(0);
 
             if (op instanceof FileSinkOperator) {
               FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
@@ -489,7 +491,7 @@ public class ExecDriver extends Task<Map
       if (rj != null) {
         JobCloseFeedBack feedBack = new JobCloseFeedBack();
         if (work.getAliasToWork() != null) {
-          for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+          for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
             op.jobClose(job, success, feedBack);
           }
         }
@@ -743,7 +745,7 @@ public class ExecDriver extends Task<Map
   }
 
   @Override
-  public Collection<Operator<? extends Serializable>> getTopOperators() {
+  public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
     return getWork().getAliasToWork().values();
   }
 
@@ -947,11 +949,12 @@ public class ExecDriver extends Task<Map
     if (pa != null) {
       for (List<String> ls : pa.values()) {
         for (String a : ls) {
-          ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+          ArrayList<Operator<? extends OperatorDesc>> opList =
+            new ArrayList<Operator<? extends OperatorDesc>>();
           opList.add(work.getAliasToWork().get(a));
 
           while (!opList.isEmpty()) {
-            Operator<? extends Serializable> op = opList.remove(0);
+            Operator<? extends OperatorDesc> op = opList.remove(0);
 
             if (op instanceof FileSinkOperator) {
               FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
@@ -973,7 +976,7 @@ public class ExecDriver extends Task<Map
 
   @Override
   public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
-    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
       op.updateCounters(ctrs);
     }
     if (work.getReducer() != null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Wed Aug 29 17:43:59 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
@@ -31,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -104,8 +104,8 @@ public class ExecMapper extends MapReduc
       //The following code is for mapjoin
       //initialize all the dummy ops
       l4j.info("Initializing dummy operator");
-      List<Operator<? extends Serializable>> dummyOps = localWork.getDummyParentOp();
-      for(Operator<? extends Serializable> dummyOp : dummyOps){
+      List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
+      for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
         dummyOp.setExecContext(execContext);
         dummyOp.initialize(jc,null);
       }
@@ -194,9 +194,9 @@ public class ExecMapper extends MapReduc
 
       //for close the local work
       if(localWork != null){
-        List<Operator<? extends Serializable>> dummyOps = localWork.getDummyParentOp();
+        List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
 
-        for(Operator<? extends Serializable> dummyOp : dummyOps){
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
           dummyOp.close(abort);
         }
       }
@@ -204,7 +204,7 @@ public class ExecMapper extends MapReduc
       if (fetchOperators != null) {
         MapredLocalWork localWork = mo.getConf().getMapLocalWork();
         for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-          Operator<? extends Serializable> forwardOp = localWork
+          Operator<? extends OperatorDesc> forwardOp = localWork
               .getAliasToWork().get(entry.getKey());
           forwardOp.close(abort);
         }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Wed Aug 29 17:43:59 2012
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
 import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,12 +42,12 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.json.JSONException;
 import org.json.JSONObject;
-import java.lang.reflect.InvocationTargetException;
 
 /**
  * ExplainTask implementation.
@@ -281,7 +282,8 @@ public class ExplainTask extends Task<Ex
     // If this is an operator then we need to call the plan generation on the
     // conf and then the children
     if (work instanceof Operator) {
-      Operator<? extends Serializable> operator = (Operator<? extends Serializable>) work;
+      Operator<? extends OperatorDesc> operator =
+        (Operator<? extends OperatorDesc>) work;
       if (operator.getConf() != null) {
         JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
             jsonOutput, jsonOutput ? 0 : indent);
@@ -291,7 +293,7 @@ public class ExplainTask extends Task<Ex
       }
 
       if (operator.getChildOperators() != null) {
-        for (Operator<? extends Serializable> op : operator.getChildOperators()) {
+        for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) {
           JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, jsonOutput ? 0 : indent + 2);
           if (jsonOutput) {
             json.put(operator.getOperatorId(), jsonOut);
@@ -651,6 +653,7 @@ public class ExplainTask extends Task<Ex
     throw new RuntimeException("Unexpected call");
   }
 
+  @Override
   public List<FieldSchema> getResultSchema() {
     FieldSchema tmpFieldSchema = new FieldSchema();
     List<FieldSchema> colList = new ArrayList<FieldSchema>();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 29 17:43:59 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
@@ -1057,7 +1058,7 @@ public class GroupByOperator extends Ope
 
   // Group by contains the columns needed - no need to aggregate from children
   public List<String> genColLists(
-      HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+      HashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {
     List<String> colLists = new ArrayList<String>();
     ArrayList<ExprNodeDesc> keys = conf.getKeys();
     for (ExprNodeDesc key : keys) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Aug 29 17:43:59 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -58,7 +59,7 @@ import org.apache.hadoop.util.StringUtil
  * different from regular operators in that it starts off by processing a
  * Writable data structure from a Table (instead of a Hive Object).
  **/
-public class MapOperator extends Operator<MapredWork> implements Serializable {
+public class MapOperator extends Operator<MapredWork> implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 1L;
 
@@ -83,17 +84,17 @@ public class MapOperator extends Operato
   private Map<MapInputPath, MapOpCtx> opCtxMap;
   private final Set<MapInputPath> listInputPaths = new HashSet<MapInputPath>();
 
-  private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
+  private Map<Operator<? extends OperatorDesc>, ArrayList<String>> operatorToPaths;
 
-  private final Map<Operator<? extends Serializable>, MapOpCtx> childrenOpToOpCtxMap =
-    new HashMap<Operator<? extends Serializable>, MapOpCtx>();
+  private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
+    new HashMap<Operator<? extends OperatorDesc>, MapOpCtx>();
 
-  private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
+  private ArrayList<Operator<? extends OperatorDesc>> extraChildrenToClose = null;
 
   private static class MapInputPath {
     String path;
     String alias;
-    Operator<? extends Serializable> op;
+    Operator<? extends OperatorDesc> op;
 
     /**
      * @param path
@@ -101,7 +102,7 @@ public class MapOperator extends Operato
      * @param op
      */
     public MapInputPath(String path, String alias,
-        Operator<? extends Serializable> op) {
+        Operator<? extends OperatorDesc> op) {
       this.path = path;
       this.alias = alias;
       this.op = op;
@@ -129,11 +130,11 @@ public class MapOperator extends Operato
       return ret;
     }
 
-    public Operator<? extends Serializable> getOp() {
+    public Operator<? extends OperatorDesc> getOp() {
       return op;
     }
 
-    public void setOp(Operator<? extends Serializable> op) {
+    public void setOp(Operator<? extends OperatorDesc> op) {
       this.op = op;
     }
 
@@ -304,7 +305,7 @@ public class MapOperator extends Operato
    * need to be changed if the input changes
    **/
   private void setInspectorInput(MapInputPath inp) {
-    Operator<? extends Serializable> op = inp.getOp();
+    Operator<? extends OperatorDesc> op = inp.getOp();
 
     deserializer = opCtxMap.get(inp).getDeserializer();
     isPartitioned = opCtxMap.get(inp).isPartitioned();
@@ -367,9 +368,10 @@ public class MapOperator extends Operato
     Path fpath = new Path((new Path(HiveConf.getVar(hconf,
         HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
 
-    ArrayList<Operator<? extends Serializable>> children = new ArrayList<Operator<? extends Serializable>>();
+    ArrayList<Operator<? extends OperatorDesc>> children =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
-    operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>>();
+    operatorToPaths = new HashMap<Operator<? extends OperatorDesc>, ArrayList<String>>();
 
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
 
@@ -380,17 +382,17 @@ public class MapOperator extends Operato
         List<String> aliases = conf.getPathToAliases().get(onefile);
 
         for (String onealias : aliases) {
-          Operator<? extends Serializable> op = conf.getAliasToWork().get(
+          Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(
             onealias);
           LOG.info("Adding alias " + onealias + " to work list for file "
             + onefile);
           MapInputPath inp = new MapInputPath(onefile, onealias, op);
           opCtxMap.put(inp, opCtx);
           if (operatorToPaths.get(op) == null) {
-            operatorToPaths.put(op, new java.util.ArrayList<String>());
+            operatorToPaths.put(op, new ArrayList<String>());
           }
           operatorToPaths.get(op).add(onefile);
-          op.setParentOperators(new ArrayList<Operator<? extends Serializable>>());
+          op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
           op.getParentOperators().add(this);
           // check for the operators who will process rows coming to this Map
           // Operator
@@ -423,11 +425,11 @@ public class MapOperator extends Operato
   public void initializeOp(Configuration hconf) throws HiveException {
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
-    List<Operator<? extends Serializable>> children = getChildOperators();
+    List<Operator<? extends OperatorDesc>> children = getChildOperators();
 
-    for (Entry<Operator<? extends Serializable>, MapOpCtx> entry : childrenOpToOpCtxMap
+    for (Entry<Operator<? extends OperatorDesc>, MapOpCtx> entry : childrenOpToOpCtxMap
         .entrySet()) {
-      Operator<? extends Serializable> child = entry.getKey();
+      Operator<? extends OperatorDesc> child = entry.getKey();
       MapOpCtx mapOpCtx = entry.getValue();
       // Add alias, table name, and partitions to hadoop conf so that their
       // children will
@@ -448,12 +450,12 @@ public class MapOperator extends Operato
       HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry
           .getValue().partName);
       MapInputPath input = entry.getKey();
-      Operator<? extends Serializable> op = input.op;
+      Operator<? extends OperatorDesc> op = input.op;
       // op is not in the children list, so need to remember it and close it
       // afterwards
       if (children.indexOf(op) == -1) {
         if (extraChildrenToClose == null) {
-          extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
+          extraChildrenToClose = new ArrayList<Operator<? extends OperatorDesc>>();
         }
         extraChildrenToClose.add(op);
         op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()});
@@ -467,7 +469,7 @@ public class MapOperator extends Operato
   @Override
   public void closeOp(boolean abort) throws HiveException {
     if (extraChildrenToClose != null) {
-      for (Operator<? extends Serializable> op : extraChildrenToClose) {
+      for (Operator<? extends OperatorDesc> op : extraChildrenToClose) {
         op.close(abort);
       }
     }
@@ -486,7 +488,7 @@ public class MapOperator extends Operato
       // Operator
       if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
         String onealias = conf.getPathToAliases().get(onefile).get(0);
-        Operator<? extends Serializable> op =
+        Operator<? extends OperatorDesc> op =
             conf.getAliasToWork().get(onealias);
 
         LOG.info("Processing alias " + onealias + " for file " + onefile);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Aug 29 17:43:59 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -548,7 +549,7 @@ public class MapRedTask extends ExecDriv
   }
 
   @Override
-  public Operator<? extends Serializable> getReducer() {
+  public Operator<? extends OperatorDesc> getReducer() {
     return getWork().getReducer();
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Wed Aug 29 17:43:59 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -312,7 +313,7 @@ public class MapredLocalTask extends Tas
       }
 
       // get the root operator
-      Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
+      Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
       // walk through the operator tree
       while (true) {
         InspectableObject row = fetchOp.getNextRow();
@@ -342,7 +343,8 @@ public class MapredLocalTask extends Tas
     for (Map.Entry<String, FetchWork> entry : work.getAliasToFetchWork().entrySet()) {
       JobConf jobClone = new JobConf(job);
 
-      Operator<? extends Serializable> tableScan = work.getAliasToWork().get(entry.getKey());
+      Operator<? extends OperatorDesc> tableScan =
+        work.getAliasToWork().get(entry.getKey());
       boolean setColumnsNeeded = false;
       if (tableScan instanceof TableScanOperator) {
         ArrayList<Integer> list = ((TableScanOperator) tableScan).getNeededColumnIDs();
@@ -366,7 +368,7 @@ public class MapredLocalTask extends Tas
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
       // get the forward op
       String alias = entry.getKey();
-      Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
+      Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
 
       // put the exe context into all the operators
       forwardOp.setExecContext(execContext);
@@ -386,8 +388,8 @@ public class MapredLocalTask extends Tas
 
   private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
     // find the (byte)tag for the map join(HashTableSinkOperator)
-    Operator<? extends Serializable> parentOp = work.getAliasToWork().get(alias);
-    Operator<? extends Serializable> childOp = parentOp.getChildOperators().get(0);
+    Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
+    Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
     while ((childOp != null) && (!(childOp instanceof HashTableSinkOperator))) {
       parentOp = childOp;
       assert parentOp.getChildOperators().size() == 1;
@@ -447,7 +449,7 @@ public class MapredLocalTask extends Tas
   }
 
   @Override
-  public Collection<Operator<? extends Serializable>> getTopOperators() {
+  public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
     return getWork().getAliasToWork().values();
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 29 17:43:59 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -46,15 +47,15 @@ import org.apache.hadoop.mapred.Reporter
 /**
  * Base operator implementation.
  **/
-public abstract class Operator<T extends Serializable> implements Serializable,
-    Node {
+public abstract class Operator<T extends OperatorDesc> implements Serializable,Cloneable,
+  Node {
 
   // Bean methods
 
   private static final long serialVersionUID = 1L;
 
-  protected List<Operator<? extends Serializable>> childOperators;
-  protected List<Operator<? extends Serializable>> parentOperators;
+  protected List<Operator<? extends OperatorDesc>> childOperators;
+  protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
   /**
    * List of counter names associated with the operator. It contains the
@@ -122,11 +123,11 @@ public abstract class Operator<T extends
   }
 
   public void setChildOperators(
-      List<Operator<? extends Serializable>> childOperators) {
+      List<Operator<? extends OperatorDesc>> childOperators) {
     this.childOperators = childOperators;
   }
 
-  public List<Operator<? extends Serializable>> getChildOperators() {
+  public List<Operator<? extends OperatorDesc>> getChildOperators() {
     return childOperators;
   }
 
@@ -140,7 +141,7 @@ public abstract class Operator<T extends
     }
 
     ArrayList<Node> ret_vec = new ArrayList<Node>();
-    for (Operator<? extends Serializable> op : getChildOperators()) {
+    for (Operator<? extends OperatorDesc> op : getChildOperators()) {
       ret_vec.add(op);
     }
 
@@ -148,11 +149,11 @@ public abstract class Operator<T extends
   }
 
   public void setParentOperators(
-      List<Operator<? extends Serializable>> parentOperators) {
+      List<Operator<? extends OperatorDesc>> parentOperators) {
     this.parentOperators = parentOperators;
   }
 
-  public List<Operator<? extends Serializable>> getParentOperators() {
+  public List<Operator<? extends OperatorDesc>> getParentOperators() {
     return parentOperators;
   }
 
@@ -231,7 +232,7 @@ public abstract class Operator<T extends
       return;
     }
 
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setReporter(rep);
     }
   }
@@ -244,7 +245,7 @@ public abstract class Operator<T extends
       return;
     }
 
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setOutputCollector(out);
     }
   }
@@ -259,7 +260,7 @@ public abstract class Operator<T extends
       return;
     }
 
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setAlias(alias);
     }
   }
@@ -282,7 +283,7 @@ public abstract class Operator<T extends
     if (parentOperators == null) {
       return true;
     }
-    for (Operator<? extends Serializable> parent : parentOperators) {
+    for (Operator<? extends OperatorDesc> parent : parentOperators) {
       if (parent == null) {
         //return true;
         continue;
@@ -331,7 +332,7 @@ public abstract class Operator<T extends
       }
       childOperatorsTag = new int[childOperatorsArray.length];
       for (int i = 0; i < childOperatorsArray.length; i++) {
-        List<Operator<? extends Serializable>> parentOperators = childOperatorsArray[i]
+        List<Operator<? extends OperatorDesc>> parentOperators = childOperatorsArray[i]
             .getParentOperators();
         if (parentOperators == null) {
           throw new HiveException("Hive internal error: parent is null in "
@@ -361,7 +362,7 @@ public abstract class Operator<T extends
   public void initializeLocalWork(Configuration hconf) throws HiveException {
     if (childOperators != null) {
       for (int i =0; i<childOperators.size();i++) {
-        Operator<? extends Serializable> childOp = this.childOperators.get(i);
+        Operator<? extends OperatorDesc> childOp = this.childOperators.get(i);
         childOp.initializeLocalWork(hconf);
       }
     }
@@ -485,7 +486,7 @@ public abstract class Operator<T extends
     }
 
     LOG.debug("Starting group for children:");
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.startGroup();
     }
 
@@ -505,7 +506,7 @@ public abstract class Operator<T extends
     }
 
     LOG.debug("Ending group for children:");
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.endGroup();
     }
 
@@ -514,7 +515,7 @@ public abstract class Operator<T extends
 
   protected boolean allInitializedParentsAreClosed() {
     if (parentOperators != null) {
-      for (Operator<? extends Serializable> parent : parentOperators) {
+      for (Operator<? extends OperatorDesc> parent : parentOperators) {
         if(parent==null){
           continue;
         }
@@ -562,7 +563,7 @@ public abstract class Operator<T extends
         return;
       }
 
-      for (Operator<? extends Serializable> op : childOperators) {
+      for (Operator<? extends OperatorDesc> op : childOperators) {
         op.close(abort);
       }
 
@@ -595,7 +596,7 @@ public abstract class Operator<T extends
       return;
     }
 
-    for (Operator<? extends Serializable> op : childOperators) {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
       op.jobClose(conf, success, feedBack);
     }
   }
@@ -604,7 +605,7 @@ public abstract class Operator<T extends
    * Cache childOperators in an array for faster access. childOperatorsArray is
    * accessed per row, so it's important to make the access efficient.
    */
-  protected transient Operator<? extends Serializable>[] childOperatorsArray = null;
+  protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
   protected transient int[] childOperatorsTag;
 
   // counters for debugging
@@ -620,14 +621,14 @@ public abstract class Operator<T extends
    * @param newChild
    *          the new child
    */
-  public void replaceChild(Operator<? extends Serializable> child,
-      Operator<? extends Serializable> newChild) {
+  public void replaceChild(Operator<? extends OperatorDesc> child,
+      Operator<? extends OperatorDesc> newChild) {
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
     childOperators.set(childIndex, newChild);
   }
 
-  public void removeChild(Operator<? extends Serializable> child) {
+  public void removeChild(Operator<? extends OperatorDesc> child) {
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
     if (childOperators.size() == 1) {
@@ -651,7 +652,8 @@ public abstract class Operator<T extends
    * @param child   If this operator is not the only parent of the child. There can be unpredictable result.
    * @throws SemanticException
    */
-  public void removeChildAndAdoptItsChildren(Operator<? extends Serializable> child) throws SemanticException {
+  public void removeChildAndAdoptItsChildren(
+    Operator<? extends OperatorDesc> child) throws SemanticException {
     int childIndex = childOperators.indexOf(child);
     if (childIndex == -1) {
       throw new SemanticException(
@@ -664,18 +666,18 @@ public abstract class Operator<T extends
       childOperators.addAll(childIndex, child.getChildOperators());
     }
 
-    for (Operator<? extends Serializable> gc : child.getChildOperators()) {
-      List<Operator<? extends Serializable>> parents = gc.getParentOperators();
+    for (Operator<? extends OperatorDesc> gc : child.getChildOperators()) {
+      List<Operator<? extends OperatorDesc>> parents = gc.getParentOperators();
       int index = parents.indexOf(child);
       if (index == -1) {
         throw new SemanticException(
-            "Exception when trying to remove partition predicates: fail to find parent from child");
+          "Exception when trying to remove partition predicates: fail to find parent from child");
       }
       parents.set(index, this);
     }
   }
 
-  public void removeParent(Operator<? extends Serializable> parent) {
+  public void removeParent(Operator<? extends OperatorDesc> parent) {
     int parentIndex = parentOperators.indexOf(parent);
     assert parentIndex != -1;
     if (parentOperators.size() == 1) {
@@ -702,8 +704,8 @@ public abstract class Operator<T extends
    * @param newParent
    *          the new parent
    */
-  public void replaceParent(Operator<? extends Serializable> parent,
-      Operator<? extends Serializable> newParent) {
+  public void replaceParent(Operator<? extends OperatorDesc> parent,
+      Operator<? extends OperatorDesc> newParent) {
     int parentIndex = parentOperators.indexOf(parent);
     assert parentIndex != -1;
     parentOperators.set(parentIndex, newParent);
@@ -755,7 +757,7 @@ public abstract class Operator<T extends
 
     int childrenDone = 0;
     for (int i = 0; i < childOperatorsArray.length; i++) {
-      Operator<? extends Serializable> o = childOperatorsArray[i];
+      Operator<? extends OperatorDesc> o = childOperatorsArray[i];
       if (o.getDone()) {
         childrenDone++;
       } else {
@@ -778,7 +780,7 @@ public abstract class Operator<T extends
   public void reset(){
     this.state=State.INIT;
     if (childOperators != null) {
-      for (Operator<? extends Serializable> o : childOperators) {
+      for (Operator<? extends OperatorDesc> o : childOperators) {
         o.reset();
       }
     }
@@ -790,13 +792,13 @@ public abstract class Operator<T extends
    *
    */
   public static interface OperatorFunc {
-    void func(Operator<? extends Serializable> op);
+    void func(Operator<? extends OperatorDesc> op);
   }
 
   public void preorderMap(OperatorFunc opFunc) {
     opFunc.func(this);
     if (childOperators != null) {
-      for (Operator<? extends Serializable> o : childOperators) {
+      for (Operator<? extends OperatorDesc> o : childOperators) {
         o.preorderMap(opFunc);
       }
     }
@@ -863,7 +865,7 @@ public abstract class Operator<T extends
     if (childOperators != null) {
       s.append(ls);
       s.append("  <Children>");
-      for (Operator<? extends Serializable> o : childOperators) {
+      for (Operator<? extends OperatorDesc> o : childOperators) {
         s.append(o.dump(level + 2, seenOpts));
       }
       s.append(ls);
@@ -873,7 +875,7 @@ public abstract class Operator<T extends
     if (parentOperators != null) {
       s.append(ls);
       s.append("  <Parent>");
-      for (Operator<? extends Serializable> o : parentOperators) {
+      for (Operator<? extends OperatorDesc> o : parentOperators) {
         s.append("Id = " + o.id + " ");
         s.append(o.dump(level, seenOpts));
       }
@@ -1154,7 +1156,7 @@ public abstract class Operator<T extends
     // but, some operators may be updated more than once and that's ok
     if (getChildren() != null) {
       for (Node op : getChildren()) {
-        ((Operator<? extends Serializable>) op).updateCounters(ctrs);
+        ((Operator<? extends OperatorDesc>) op).updateCounters(ctrs);
       }
     }
   }
@@ -1189,7 +1191,7 @@ public abstract class Operator<T extends
 
     if (getChildren() != null) {
       for (Node op : getChildren()) {
-        if (((Operator<? extends Serializable>) op).checkFatalErrors(ctrs,
+        if (((Operator<? extends OperatorDesc>) op).checkFatalErrors(ctrs,
             errMsg)) {
           return true;
         }
@@ -1309,7 +1311,7 @@ public abstract class Operator<T extends
     this.execContext = execContext;
     if(this.childOperators != null) {
       for (int i = 0; i<this.childOperators.size();i++) {
-        Operator<? extends Serializable> op = this.childOperators.get(i);
+        Operator<? extends OperatorDesc> op = this.childOperators.get(i);
         op.setExecContext(execContext);
       }
     }
@@ -1321,7 +1323,7 @@ public abstract class Operator<T extends
     this.cleanUpInputFileChangedOp();
     if(this.childOperators != null) {
       for (int i = 0; i<this.childOperators.size();i++) {
-        Operator<? extends Serializable> op = this.childOperators.get(i);
+        Operator<? extends OperatorDesc> op = this.childOperators.get(i);
         op.cleanUpInputFileChanged();
       }
     }
@@ -1332,4 +1334,25 @@ public abstract class Operator<T extends
   public void cleanUpInputFileChangedOp() throws HiveException {
   }
 
+  @Override
+  public Operator<? extends OperatorDesc> clone()
+    throws CloneNotSupportedException {
+
+    List<Operator<? extends OperatorDesc>> parents = getParentOperators();
+    List<Operator<? extends OperatorDesc>> parentClones =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+
+    if (parents != null) {
+      for (Operator<? extends OperatorDesc> parent : parents) {
+        parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+      }
+    }
+
+    T descClone = (T)conf.clone();
+    Operator<? extends OperatorDesc> ret =
+      (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+        descClone, getSchema(), parentClones);
+
+    return ret;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.La
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -54,7 +54,7 @@ public final class OperatorFactory {
    *
    * @param <T>
    */
-  public static final class OpTuple<T extends Serializable> {
+  public static final class OpTuple<T extends OperatorDesc> {
     public Class<T> descClass;
     public Class<? extends Operator<T>> opClass;
 
@@ -93,7 +93,7 @@ public final class OperatorFactory {
         HashTableSinkOperator.class));
   }
 
-  public static <T extends Serializable> Operator<T> get(Class<T> opClass) {
+  public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
 
     for (OpTuple o : opvec) {
       if (o.descClass == opClass) {
@@ -111,7 +111,7 @@ public final class OperatorFactory {
         + opClass.getName());
   }
 
-  public static <T extends Serializable> Operator<T> get(Class<T> opClass,
+  public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass,
       RowSchema rwsch) {
 
     Operator<T> ret = get(opClass);
@@ -122,36 +122,46 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of children operators.
    */
-  public static <T extends Serializable> Operator<T> get(T conf,
-      Operator<? extends Serializable>... oplist) {
+  public static <T extends OperatorDesc> Operator<T> get(T conf,
+    Operator<? extends OperatorDesc>... oplist) {
     Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
+    makeChild(ret, oplist);
+    return (ret);
+  }
+
+  /**
+   * Returns an operator given the conf and a list of children operators.
+   */
+  public static void makeChild(
+    Operator<? extends OperatorDesc> ret,
+    Operator<? extends OperatorDesc>... oplist) {
     if (oplist.length == 0) {
-      return (ret);
+      return;
     }
 
-    ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>>();
-    for (Operator op : oplist) {
+    ArrayList<Operator<? extends OperatorDesc>> clist =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    for (Operator<? extends OperatorDesc> op : oplist) {
       clist.add(op);
     }
     ret.setChildOperators(clist);
 
     // Add this parent to the children
-    for (Operator op : oplist) {
-      List<Operator<? extends Serializable>> parents = op.getParentOperators();
+    for (Operator<? extends OperatorDesc> op : oplist) {
+      List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
       if (parents == null) {
-        parents = new ArrayList<Operator<? extends Serializable>>();
+        parents = new ArrayList<Operator<? extends OperatorDesc>>();
       }
       parents.add(ret);
       op.setParentOperators(parents);
     }
-    return (ret);
   }
 
   /**
    * Returns an operator given the conf and a list of children operators.
    */
-  public static <T extends Serializable> Operator<T> get(T conf,
+  public static <T extends OperatorDesc> Operator<T> get(T conf,
       RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = get(conf, oplist);
     ret.setSchema(rwsch);
@@ -161,7 +171,7 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
       Operator... oplist) {
     Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
@@ -180,7 +190,8 @@ public final class OperatorFactory {
     }
 
     // add parents for the newly created operator
-    List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> parent =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     for (Operator op : oplist) {
       parent.add(op);
     }
@@ -193,8 +204,8 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
-      List<Operator<? extends Serializable>> oplist) {
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
+      List<Operator<? extends OperatorDesc>> oplist) {
     Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
     if (oplist.size() == 0) {
@@ -212,7 +223,8 @@ public final class OperatorFactory {
     }
 
     // add parents for the newly created operator
-    List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+    List<Operator<? extends OperatorDesc>> parent =
+      new ArrayList<Operator<? extends OperatorDesc>>();
     for (Operator op : oplist) {
       parent.add(op);
     }
@@ -225,7 +237,7 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
       RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
@@ -235,8 +247,8 @@ public final class OperatorFactory {
   /**
    * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
-      RowSchema rwsch, List<Operator<? extends Serializable>> oplist) {
+  public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
+      RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
     return (ret);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Aug 29 17:43:59 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.Bu
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -148,9 +149,9 @@ public class SMBMapJoinOperator extends 
     for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
         .entrySet()) {
       JobConf jobClone = new JobConf(hconf);
-      Operator<? extends Serializable> tableScan = localWork.getAliasToWork()
-      .get(entry.getKey());
-      if(tableScan instanceof TableScanOperator) {
+      Operator<? extends OperatorDesc> tableScan = localWork.getAliasToWork()
+        .get(entry.getKey());
+      if (tableScan instanceof TableScanOperator) {
         ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
         if (list != null) {
           ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
@@ -165,8 +166,8 @@ public class SMBMapJoinOperator extends 
     }
 
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-      Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
-          .get(entry.getKey());
+      Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
+        .get(entry.getKey());
       // All the operators need to be initialized before process
       forwardOp.setExecContext(this.getExecContext());
       FetchOperator fetchOp = entry.getValue();
@@ -500,7 +501,7 @@ public class SMBMapJoinOperator extends 
       String tble = this.tagToAlias.get(tag);
       FetchOperator fetchOp = fetchOperators.get(tble);
 
-      Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+      Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
           .get(tble);
       try {
         InspectableObject row = fetchOp.getNextRow();
@@ -565,7 +566,7 @@ public class SMBMapJoinOperator extends 
     super.closeOp(abort);
     if (fetchOperators != null) {
       for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-        Operator<? extends Serializable> forwardOp = localWork
+        Operator<? extends OperatorDesc> forwardOp = localWork
             .getAliasToWork().get(entry.getKey());
         forwardOp.close(abort);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Wed Aug 29 17:43:59 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -94,11 +94,11 @@ public class SkewJoinHandler {
   List<Object> dummyKey = null;
   String taskId;
 
-  private final CommonJoinOperator<? extends Serializable> joinOp;
+  private final CommonJoinOperator<? extends OperatorDesc> joinOp;
   private final int numAliases;
   private final JoinDesc conf;
 
-  public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
+  public SkewJoinHandler(CommonJoinOperator<? extends OperatorDesc> joinOp) {
     this.joinOp = joinOp;
     numAliases = joinOp.numAliases;
     conf = joinOp.getConf();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Aug 29 17:43:59 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -350,18 +351,18 @@ public abstract class Task<T extends Ser
     return false;
   }
 
-  public Collection<Operator<? extends Serializable>> getTopOperators() {
-    return new LinkedList<Operator<? extends Serializable>>();
+  public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
+    return new LinkedList<Operator<? extends OperatorDesc>>();
   }
-  
+
   public boolean hasReduce() {
     return false;
   }
 
-  public Operator<? extends Serializable> getReducer() {
+  public Operator<? extends OperatorDesc> getReducer() {
     return null;
   }
-  
+
   public HashMap<String, Long> getCounters() {
     return taskCounters;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java Wed Aug 29 17:43:59 2012
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
 /**
  * Terminal Operator Base Class.
  **/
-public abstract class TerminalOperator<T extends Serializable> extends
+public abstract class TerminalOperator<T extends OperatorDesc> extends
     Operator<T> implements Serializable {
   private static final long serialVersionUID = 1L;
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Aug 29 17:43:59 2012
@@ -118,8 +118,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -135,8 +135,8 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.index.compact;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -57,14 +56,13 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
 
 public class CompactIndexHandler extends TableBasedIndexHandler {
 
@@ -252,9 +250,11 @@ public class CompactIndexHandler extends
    * @param operators
    * @return whether or not it has found its target
    */
-  private boolean findIndexColumnFilter(Collection<Operator<? extends Serializable>> operators) {
-    for (Operator<? extends Serializable> op : operators) {
-      if (op instanceof FilterOperator && ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
+  private boolean findIndexColumnFilter(
+    Collection<Operator<? extends OperatorDesc>> operators) {
+    for (Operator<? extends OperatorDesc> op : operators) {
+      if (op instanceof FilterOperator &&
+        ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
         // Is this the target
         if (findIndexColumnExprNodeDesc(((FilterOperator)op).getConf().getPredicate())) {
           ((FilterOperator)op).getConf().setSortedFilter(true);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -41,6 +40,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
@@ -224,10 +224,10 @@ public class CombineHiveInputFormat<K ex
   // Splits are not shared across different partitions with different input formats.
   // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
   private static class CombinePathInputFormat {
-    private final List<Operator<? extends Serializable>> opList;
+    private final List<Operator<? extends OperatorDesc>> opList;
     private final String inputFormatClassName;
 
-    public CombinePathInputFormat(List<Operator<? extends Serializable>> opList,
+    public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList,
                                   String inputFormatClassName) {
       this.opList = opList;
       this.inputFormatClassName = inputFormatClassName;
@@ -259,7 +259,7 @@ public class CombineHiveInputFormat<K ex
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     init(job);
     Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
-    Map<String, Operator<? extends Serializable>> aliasToWork =
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork =
       mrwork.getAliasToWork();
     CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
         .getCombineFileInputFormat();
@@ -341,7 +341,7 @@ public class CombineHiveInputFormat<K ex
 
       // Does a pool exist for this path already
       CombineFilter f = null;
-      List<Operator<? extends Serializable>> opList = null;
+      List<Operator<? extends OperatorDesc>> opList = null;
       boolean done = false;
 
       if (!mrwork.isMapperCannotSpanPartns()) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Aug 29 17:43:59 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,15 +30,16 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
@@ -385,11 +385,11 @@ public final class HiveFileFormatUtils {
    * @param aliasToWork    The operator tree to be invoked for a given alias
    * @param dir            The path to look for
    **/
-  public static List<Operator<? extends Serializable>> doGetWorksFromPath(
+  public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath(
     Map<String, ArrayList<String>> pathToAliases,
-    Map<String, Operator<? extends Serializable>> aliasToWork, Path dir) {
-    List<Operator<? extends Serializable>> opList =
-      new ArrayList<Operator<? extends Serializable>>();
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) {
+    List<Operator<? extends OperatorDesc>> opList =
+      new ArrayList<Operator<? extends OperatorDesc>>();
 
     List<String> aliases = doGetAliasesFromPath(pathToAliases, dir);
     for (String alias : aliases) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -416,8 +416,8 @@ public class HiveInputFormat<K extends W
     }
 
     for (String alias : aliases) {
-      Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(
-          alias);
+      Operator<? extends OperatorDesc> op = this.mrwork.getAliasToWork().get(
+        alias);
       if (op != null && op instanceof TableScanOperator) {
         TableScanOperator tableScan = (TableScanOperator) op;
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.lib;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.Stack;
@@ -43,7 +42,7 @@ public class DefaultGraphWalker implemen
 
   /**
    * Constructor.
-   * 
+   *
    * @param disp
    *          dispatcher to call for each op encountered
    */
@@ -68,7 +67,7 @@ public class DefaultGraphWalker implemen
 
   /**
    * Dispatch the current operator.
-   * 
+   *
    * @param nd
    *          node being walked
    * @param ndStack
@@ -91,7 +90,7 @@ public class DefaultGraphWalker implemen
 
   /**
    * starting point for walking.
-   * 
+   *
    * @throws SemanticException
    */
   public void startWalking(Collection<Node> startNodes,
@@ -108,7 +107,7 @@ public class DefaultGraphWalker implemen
 
   /**
    * walk the current operator and its descendants.
-   * 
+   *
    * @param nd
    *          current operator in the graph
    * @throws SemanticException
@@ -122,7 +121,7 @@ public class DefaultGraphWalker implemen
         || getDispatchedList().containsAll(nd.getChildren())) {
       // all children are done or no need to walk the children
       if (!getDispatchedList().contains(nd)) {
-        dispatch(nd, opStack);        
+        dispatch(nd, opStack);
       }
       opStack.pop();
       return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Aug 29 17:43:59 2012
@@ -543,7 +543,7 @@ public class Table implements Serializab
     tTable.getSd().getSkewedInfo().setSkewedColNames(skewedColNames);
   }
 
-  public List<String> getSkewedColName() {
+  public List<String> getSkewedColNames() {
     return tTable.getSd().getSkewedInfo().getSkewedColNames();
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 
 /**
@@ -188,7 +188,8 @@ public class BucketMapJoinOptimizer impl
       LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
           new LinkedHashMap<String, List<List<String>>>();
 
-      Map<String, Operator<? extends Serializable>> topOps = this.pGraphContext.getTopOps();
+      Map<String, Operator<? extends OperatorDesc>> topOps =
+        this.pGraphContext.getTopOps();
       Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
 
       // (partition to bucket file names) and (partition to bucket number) for

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * Implementation of one of the rule-based optimization steps. ColumnPruner gets
@@ -50,7 +50,7 @@ import org.apache.hadoop.hive.ql.parse.S
  */
 public class ColumnPruner implements Transform {
   protected ParseContext pGraphContext;
-  private HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+  private HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap;
 
   /**
    * empty constructor.