You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/05 20:52:26 UTC
svn commit: r1615978 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec:
OperatorFactory.java ReduceSinkOperator.java mr/ExecMapper.java
mr/ExecReducer.java tez/MapRecordProcessor.java
tez/ReduceRecordProcessor.java
Author: szehon
Date: Tue Aug 5 18:52:26 2014
New Revision: 1615978
URL: http://svn.apache.org/r1615978
Log:
HIVE-7596 : Cleanup OperatorFactory, ReduceSinkOperator, and reportStats (Brock Noland via Szehon)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Aug 5 18:52:26 2014
@@ -64,23 +64,9 @@ import org.apache.hadoop.hive.ql.plan.Un
*
*/
public final class OperatorFactory {
+ private static final List<OpTuple> opvec;
+ private static final List<OpTuple> vectorOpvec;
- /**
- * OpTuple.
- *
- * @param <T>
- */
- public static final class OpTuple<T extends OperatorDesc> {
- public Class<T> descClass;
- public Class<? extends Operator<T>> opClass;
-
- public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
- this.descClass = descClass;
- this.opClass = opClass;
- }
- }
-
- public static ArrayList<OpTuple> opvec;
static {
opvec = new ArrayList<OpTuple>();
opvec.add(new OpTuple<FilterDesc>(FilterDesc.class, FilterOperator.class));
@@ -116,7 +102,6 @@ public final class OperatorFactory {
MuxOperator.class));
}
- public static ArrayList<OpTuple> vectorOpvec;
static {
vectorOpvec = new ArrayList<OpTuple>();
vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
@@ -130,6 +115,17 @@ public final class OperatorFactory {
vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
}
+ private static final class OpTuple<T extends OperatorDesc> {
+ private final Class<T> descClass;
+ private final Class<? extends Operator<T>> opClass;
+
+ public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+ this.descClass = descClass;
+ this.opClass = opClass;
+ }
+ }
+
+
public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
VectorizationContext vContext) throws HiveException {
Class<T> descClass = (Class<T>) conf.getClass();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Aug 5 18:52:26 2014
@@ -25,6 +25,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -61,9 +63,20 @@ public class ReduceSinkOperator extends
PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
}
+ private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
private static final long serialVersionUID = 1L;
- protected transient OutputCollector out;
+ private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
+
+ private transient ObjectInspector[] partitionObjectInspectors;
+ private transient ObjectInspector[] bucketObjectInspectors;
+ private transient int buckColIdxInKey;
+ private boolean firstRow;
+ private transient int tag;
+ private boolean skipTag = false;
+ private transient InspectableObject tempInspectableObject = new InspectableObject();
+ private transient int[] valueIndex; // index for value(+ from keys, - from values)
+ protected transient OutputCollector out;
/**
* The evaluators for the key columns. Key columns decide the sort order on
* the reducer side. Key columns are passed to the reducer in the "key".
@@ -84,38 +97,40 @@ public class ReduceSinkOperator extends
* Evaluators for bucketing columns. This is used to compute bucket number.
*/
protected transient ExprNodeEvaluator[] bucketEval = null;
-
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
- // ready
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready
protected transient Serializer keySerializer;
protected transient boolean keyIsText;
protected transient Serializer valueSerializer;
- transient int tag;
protected transient byte[] tagByte = new byte[1];
- transient protected int numDistributionKeys;
- transient protected int numDistinctExprs;
- transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
- private boolean skipTag = false;
+ protected transient int numDistributionKeys;
+ protected transient int numDistinctExprs;
+ protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
protected transient boolean autoParallel = false;
-
- protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance();
-
- private transient int[] valueIndex; // index for value(+ from keys, - from values)
-
- public void setInputAliases(String[] inputAliases) {
- this.inputAliases = inputAliases;
- }
-
- public String[] getInputAliases() {
- return inputAliases;
- }
-
- public void setOutputCollector(OutputCollector _out) {
- this.out = _out;
- }
-
// picks topN K:V pairs from input.
protected transient TopNHash reducerHash = new TopNHash();
+ protected transient HiveKey keyWritable = new HiveKey();
+ protected transient ObjectInspector keyObjectInspector;
+ protected transient ObjectInspector valueObjectInspector;
+ protected transient Object[] cachedValues;
+ protected transient List<List<Integer>> distinctColIndices;
+ protected transient Random random;
+ /**
+ * This two dimensional array holds key data and a corresponding Union object
+ * which contains the tag identifying the aggregate expression for distinct columns.
+ *
+ * If there is no distict expression, cachedKeys is simply like this.
+ * cachedKeys[0] = [col0][col1]
+ *
+ * with two distict expression, union(tag:key) is attatched for each distinct expression
+ * cachedKeys[0] = [col0][col1][0:dist1]
+ * cachedKeys[1] = [col0][col1][1:dist2]
+ *
+ * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+ * see {@link ExprNodeColumnEvaluator}
+ */
+ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
+ protected transient Object[][] cachedKeys;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
@@ -184,40 +199,12 @@ public class ReduceSinkOperator extends
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
- e.printStackTrace();
+ String msg = "Error initializing ReduceSinkOperator: " + e.getMessage();
+ LOG.error(msg, e);
throw new RuntimeException(e);
}
}
- transient InspectableObject tempInspectableObject = new InspectableObject();
- protected transient HiveKey keyWritable = new HiveKey();
-
- protected transient ObjectInspector keyObjectInspector;
- protected transient ObjectInspector valueObjectInspector;
- transient ObjectInspector[] partitionObjectInspectors;
- transient ObjectInspector[] bucketObjectInspectors = null;
- transient int buckColIdxInKey;
-
- protected transient Object[] cachedValues;
- protected transient List<List<Integer>> distinctColIndices;
- /**
- * This two dimensional array holds key data and a corresponding Union object
- * which contains the tag identifying the aggregate expression for distinct columns.
- *
- * If there is no distict expression, cachedKeys is simply like this.
- * cachedKeys[0] = [col0][col1]
- *
- * with two distict expression, union(tag:key) is attatched for each distinct expression
- * cachedKeys[0] = [col0][col1][0:dist1]
- * cachedKeys[1] = [col0][col1][1:dist2]
- *
- * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
- * see {@link ExprNodeColumnEvaluator}
- */
- // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
- protected transient Object[][] cachedKeys;
- boolean firstRow;
- protected transient Random random;
/**
* Initializes array of ExprNodeEvaluator. Adds Union field for distinct
@@ -509,4 +496,16 @@ public class ReduceSinkOperator extends
public int[] getValueIndex() {
return valueIndex;
}
+
+ public void setInputAliases(String[] inputAliases) {
+ this.inputAliases = inputAliases;
+ }
+
+ public String[] getInputAliases() {
+ return inputAliases;
+ }
+
+ public void setOutputCollector(OutputCollector _out) {
+ this.out = _out;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Aug 5 18:52:26 2014
@@ -250,7 +250,7 @@ public class ExecMapper extends MapReduc
+ used_memory);
}
- reportStats rps = new reportStats(rp);
+ ReportStats rps = new ReportStats(rp);
mo.preorderMap(rps);
return;
} catch (Exception e) {
@@ -285,10 +285,10 @@ public class ExecMapper extends MapReduc
* reportStats.
*
*/
- public static class reportStats implements Operator.OperatorFunc {
- Reporter rp;
+ public static class ReportStats implements Operator.OperatorFunc {
+ private final Reporter rp;
- public reportStats(Reporter rp) {
+ public ReportStats(Reporter rp) {
this.rp = rp;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Aug 5 18:52:26 2014
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -306,7 +306,7 @@ public class ExecReducer extends MapRedu
}
reducer.close(abort);
- reportStats rps = new reportStats(rp);
+ ReportStats rps = new ReportStats(rp);
reducer.preorderMap(rps);
} catch (Exception e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Aug 5 18:52:26 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
@@ -225,7 +225,7 @@ public class MapRecordProcessor extends
if (isLogInfoEnabled) {
logCloseInfo();
}
- reportStats rps = new reportStats(reporter);
+ ReportStats rps = new ReportStats(reporter);
mapOp.preorderMap(rps);
return;
} catch (Exception e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Aug 5 18:52:26 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
@@ -519,7 +519,7 @@ public class ReduceRecordProcessor exte
dummyOp.close(abort);
}
}
- reportStats rps = new reportStats(reporter);
+ ReportStats rps = new ReportStats(reporter);
reducer.preorderMap(rps);
} catch (Exception e) {