You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/10 17:40:05 UTC
svn commit: r1540490 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: Operator.java
OperatorUtils.java ReduceSinkOperator.java mr/ExecDriver.java
mr/ExecMapper.java mr/ExecReducer.java
Author: hashutosh
Date: Sun Nov 10 16:40:05 2013
New Revision: 1540490
URL: http://svn.apache.org/r1540490
Log:
HIVE-5753 : Remove collector from Operator base class (Mohammad Islam via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Nov 10 16:40:05 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
@@ -209,7 +208,6 @@ public abstract class Operator<T extends
// non-bean ..
protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
- protected transient OutputCollector out;
protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
protected transient String alias;
@@ -255,19 +253,6 @@ public abstract class Operator<T extends
}
}
- public void setOutputCollector(OutputCollector out) {
- this.out = out;
-
- // the collector is same across all operators
- if (childOperators == null) {
- return;
- }
-
- for (Operator<? extends OperatorDesc> op : childOperators) {
- op.setOutputCollector(out);
- }
- }
-
/**
* Store the alias this operator is working on behalf of.
*/
@@ -330,7 +315,6 @@ public abstract class Operator<T extends
}
this.configuration = hconf;
- this.out = null;
if (!areAllParentsInitialized()) {
return;
}
@@ -613,8 +597,6 @@ public abstract class Operator<T extends
op.close(abort);
}
- out = null;
-
LOG.info(id + " Close done");
} catch (HiveException e) {
e.printStackTrace();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Sun Nov 10 16:40:05 2013
@@ -20,8 +20,12 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.OutputCollector;
+
public class OperatorUtils {
public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
@@ -53,4 +57,17 @@ public class OperatorUtils {
}
return found;
}
+
+ public static void setChildrenCollector(List<Operator<? extends OperatorDesc>> childOperators, OutputCollector out) {
+ if (childOperators == null) {
+ return;
+ }
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO:
+ ((ReduceSinkOperator)op).setOutputCollector(out);
+ } else {
+ setChildrenCollector(op.getChildOperators(), out);
+ }
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sun Nov 10 16:40:05 2013
@@ -47,7 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.OutputCollector;
/**
* Reduce Sink Operator sends output to the reduce stage.
@@ -56,6 +56,7 @@ public class ReduceSinkOperator extends
implements Serializable, TopNHash.BinaryCollector {
private static final long serialVersionUID = 1L;
+ protected transient OutputCollector out;
/**
* The evaluators for the key columns. Key columns decide the sort order on
@@ -93,6 +94,10 @@ public class ReduceSinkOperator extends
return inputAlias;
}
+ public void setOutputCollector(OutputCollector _out) {
+ this.out = _out;
+ }
+
// picks topN K:V pairs from input.
protected transient TopNHash reducerHash = new TopNHash();
@Override
@@ -382,6 +387,7 @@ public class ReduceSinkOperator extends
reducerHash.flush();
}
super.closeOp(abort);
+ out = null;
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Sun Nov 10 16:40:05 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.PartitionKeySampler;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -81,6 +82,7 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.log4j.Appender;
@@ -542,7 +544,7 @@ public class ExecDriver extends Task<Map
FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts);
try {
ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()});
- ts.setOutputCollector(sampler);
+ OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler);
while (fetcher.pushRow()) { }
} finally {
fetcher.clearFetchContext();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Sun Nov 10 16:40:05 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -146,7 +147,7 @@ public class ExecMapper extends MapReduc
if (oc == null) {
oc = output;
rp = reporter;
- mo.setOutputCollector(oc);
+ OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
mo.setReporter(rp);
MapredContext.get().setReporter(reporter);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1540490&r1=1540489&r2=1540490&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sun Nov 10 16:40:05 2013
@@ -177,7 +177,6 @@ public class ExecReducer extends MapRedu
// propagate reporter and output collector to all operators
oc = output;
rp = reporter;
- reducer.setOutputCollector(oc);
reducer.setReporter(rp);
MapredContext.get().setReporter(reporter);
}