You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/10 17:40:05 UTC

svn commit: r1540490 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: Operator.java OperatorUtils.java ReduceSinkOperator.java mr/ExecDriver.java mr/ExecMapper.java mr/ExecReducer.java

Author: hashutosh
Date: Sun Nov 10 16:40:05 2013
New Revision: 1540490

URL: http://svn.apache.org/r1540490
Log:
HIVE-5753 : Remove collector from Operator base class (Mohammad Islam via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Nov 10 16:40:05 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -209,7 +208,6 @@ public abstract class Operator<T extends
   // non-bean ..
 
   protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
-  protected transient OutputCollector out;
   protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
   protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
   protected transient String alias;
@@ -255,19 +253,6 @@ public abstract class Operator<T extends
     }
   }
 
-  public void setOutputCollector(OutputCollector out) {
-    this.out = out;
-
-    // the collector is same across all operators
-    if (childOperators == null) {
-      return;
-    }
-
-    for (Operator<? extends OperatorDesc> op : childOperators) {
-      op.setOutputCollector(out);
-    }
-  }
-
   /**
    * Store the alias this operator is working on behalf of.
    */
@@ -330,7 +315,6 @@ public abstract class Operator<T extends
     }
 
     this.configuration = hconf;
-    this.out = null;
     if (!areAllParentsInitialized()) {
       return;
     }
@@ -613,8 +597,6 @@ public abstract class Operator<T extends
         op.close(abort);
       }
 
-      out = null;
-
       LOG.info(id + " Close done");
     } catch (HiveException e) {
       e.printStackTrace();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Sun Nov 10 16:40:05 2013
@@ -20,8 +20,12 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.OutputCollector;
+
 public class OperatorUtils {
 
   public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
@@ -53,4 +57,17 @@ public class OperatorUtils {
     }
     return found;
   }
+
+  public static void setChildrenCollector(List<Operator<? extends OperatorDesc>> childOperators, OutputCollector out) {
+    if (childOperators == null) {
+      return;
+    }
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO:
+        ((ReduceSinkOperator)op).setOutputCollector(out);
+      } else {
+        setChildrenCollector(op.getChildOperators(), out);
+      }
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sun Nov 10 16:40:05 2013
@@ -47,7 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
@@ -56,6 +56,7 @@ public class ReduceSinkOperator extends 
     implements Serializable, TopNHash.BinaryCollector {
 
   private static final long serialVersionUID = 1L;
+  protected transient OutputCollector out;
 
   /**
    * The evaluators for the key columns. Key columns decide the sort order on
@@ -93,6 +94,10 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
+  public void setOutputCollector(OutputCollector _out) {
+    this.out = _out;
+  }
+
   // picks topN K:V pairs from input.
   protected transient TopNHash reducerHash = new TopNHash();
   @Override
@@ -382,6 +387,7 @@ public class ReduceSinkOperator extends 
       reducerHash.flush();
     }
     super.closeOp(abort);
+    out = null;
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Sun Nov 10 16:40:05 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.PartitionKeySampler;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -81,6 +82,7 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.log4j.Appender;
@@ -542,7 +544,7 @@ public class ExecDriver extends Task<Map
       FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts);
       try {
         ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()});
-        ts.setOutputCollector(sampler);
+        OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler);
         while (fetcher.pushRow()) { }
       } finally {
         fetcher.clearFetchContext();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Sun Nov 10 16:40:05 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -146,7 +147,7 @@ public class ExecMapper extends MapReduc
     if (oc == null) {
       oc = output;
       rp = reporter;
-      mo.setOutputCollector(oc);
+      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);
       MapredContext.get().setReporter(reporter);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sun Nov 10 16:40:05 2013
@@ -177,7 +177,6 @@ public class ExecReducer extends MapRedu
       // propagate reporter and output collector to all operators
       oc = output;
       rp = reporter;
-      reducer.setOutputCollector(oc);
       reducer.setReporter(rp);
       MapredContext.get().setReporter(reporter);
     }