You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC
svn commit: r1522098 [16/30] - in /hive/branches/vectorization: ./
beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Sep 12 01:21:10 2013
@@ -32,8 +32,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -310,7 +310,7 @@ public class MapOperator extends Operato
tblRawRowObjectInspector =
(StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
partRawRowObjectInspector,
- tblDeserializer.getObjectInspector());
+ tblDeserializer.getObjectInspector(), true);
if (identityConverterTableDesc.contains(tableDesc)) {
if (!partRawRowObjectInspector.equals(tblRawRowObjectInspector)) {
@@ -347,6 +347,8 @@ public class MapOperator extends Operato
Path fpath = new Path(HiveConf.getVar(hconf,
HiveConf.ConfVars.HADOOPMAPFILENAME));
+ boolean schemeless = fpath.toUri().getScheme() == null;
+
List<Operator<? extends OperatorDesc>> children =
new ArrayList<Operator<? extends OperatorDesc>>();
@@ -358,6 +360,10 @@ public class MapOperator extends Operato
List<String> aliases = entry.getValue();
Path onepath = new Path(onefile);
+ if (schemeless) {
+ onepath = new Path(onepath.toUri().getPath());
+ }
+
PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
for (String onealias : aliases) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Sep 12 01:21:10 2013
@@ -58,7 +58,7 @@ public abstract class Operator<T extends
private static final long serialVersionUID = 1L;
- private Configuration configuration;
+ private transient Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
@@ -196,7 +196,7 @@ public abstract class Operator<T extends
}
// non-bean fields needed during compilation
- private transient RowSchema rowSchema;
+ private RowSchema rowSchema;
public void setSchema(RowSchema rowSchema) {
this.rowSchema = rowSchema;
@@ -592,6 +592,9 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
+ // call the operator specific close routine
+ closeOp(abort);
+
if (counterNameToEnum != null) {
incrCounter(numInputRowsCntr, inputRows);
incrCounter(numOutputRowsCntr, outputRows);
@@ -600,9 +603,6 @@ public abstract class Operator<T extends
LOG.info(id + " forwarded " + cntr + " rows");
- // call the operator specific close routine
- closeOp(abort);
-
try {
logStats();
if (childOperators == null) {
@@ -816,13 +816,7 @@ public abstract class Operator<T extends
}
}
- if (isLogInfoEnabled) {
- cntr++;
- if (cntr == nextCntr) {
- LOG.info(id + " forwarding " + cntr + " rows");
- nextCntr = getNextCntr(cntr);
- }
- }
+ increaseForward(1);
// For debugging purposes:
// System.out.println("" + this.getClass() + ": " +
@@ -855,6 +849,18 @@ public abstract class Operator<T extends
}
}
+ void increaseForward(long counter) {
+ if (isLogInfoEnabled) {
+ cntr += counter;
+ if (cntr >= nextCntr) {
+ LOG.info(id + " forwarding " + cntr + " rows");
+ do {
+ nextCntr = getNextCntr(nextCntr);
+ } while(cntr >= nextCntr);
+ }
+ }
+ }
+
public void resetStats() {
for (Enum<?> e : statsMap.keySet()) {
statsMap.get(e).set(0L);
@@ -1550,6 +1556,17 @@ public abstract class Operator<T extends
return true;
}
+ /**
+ * used for LimitPushdownOptimizer
+ *
+ * if all of the operators between limit and reduce-sink does not remove any input rows
+ * in the range of limit count, limit can be pushed down to reduce-sink operator.
+ * forward, select, etc.
+ */
+ public boolean acceptLimitPushdown() {
+ return false;
+ }
+
@Override
public String toString() {
return getName() + "[" + getIdentifier() + "]";
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Sep 12 01:21:10 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
private static final long serialVersionUID = 1L;
PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
* 4. Create input partition to store rows coming from previous operator
*/
@Override
- protected void initializeOp(Configuration jobConf) throws HiveException
- {
+ protected void initializeOp(Configuration jobConf) throws HiveException {
hiveConf = new HiveConf(jobConf, PTFOperator.class);
// if the parent is ExtractOperator, this invocation is from reduce-side
Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
inputPart = createFirstPartitionForChain(
inputObjInspectors[0], hiveConf, isMapOperator);
- if (isMapOperator)
- {
+ if (isMapOperator) {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
outputObjInspector = tDef.getRawInputShape().getOI();
- }
- else
- {
+ } else {
outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
}
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
}
@Override
- protected void closeOp(boolean abort) throws HiveException
- {
+ protected void closeOp(boolean abort) throws HiveException {
super.closeOp(abort);
if(inputPart.size() != 0){
- if (isMapOperator)
- {
+ if (isMapOperator) {
processMapFunction();
- }
- else
- {
+ } else {
processInputPartition();
}
}
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
@Override
public void processOp(Object row, int tag) throws HiveException
{
- if (!isMapOperator )
- {
+ if (!isMapOperator ) {
/*
* checkif current row belongs to the current accumulated Partition:
* - If not:
@@ -122,24 +109,19 @@ public class PTFOperator extends Operato
* - reset input Partition
* - set currentKey to the newKey if it is null or has changed.
*/
- newKeys.getNewKey(row, inputPart.getOI());
+ newKeys.getNewKey(row, inputPart.getInputOI());
boolean keysAreEqual = (currentKeys != null && newKeys != null)?
newKeys.equals(currentKeys) : false;
- if (currentKeys != null && !keysAreEqual)
- {
+ if (currentKeys != null && !keysAreEqual) {
processInputPartition();
inputPart.reset();
}
- if (currentKeys == null || !keysAreEqual)
- {
- if (currentKeys == null)
- {
+ if (currentKeys == null || !keysAreEqual) {
+ if (currentKeys == null) {
currentKeys = newKeys.copyKey();
- }
- else
- {
+ } else {
currentKeys.copyKey(newKeys);
}
}
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
* @param hiveConf
* @throws HiveException
*/
- protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
- {
+ protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
PTFDeserializer dS =
new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
dS.initializePTFChain(conf.getFuncDef());
}
- protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
- {
+ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
PartitionDef pDef = conf.getStartOfChain().getPartition();
ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
- for(int i=0; i<numExprs; i++)
- {
+ for(int i=0; i<numExprs; i++) {
PTFExpressionDef exprDef = exprs.get(i);
/*
* Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
newKeys = keyWrapperFactory.getKeyWrapper();
}
- protected void processInputPartition() throws HiveException
- {
+ protected void processInputPartition() throws HiveException {
PTFPartition outPart = executeChain(inputPart);
- if ( conf.forWindowing() ) {
- executeWindowExprs(outPart);
- }
- else {
- PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
- Object oRow = pItr.next();
- forward(oRow, outputObjInspector);
- }
- }
+ PTFPartitionIterator<Object> pItr = outPart.iterator();
+ while (pItr.hasNext()) {
+ Object oRow = pItr.next();
+ forward(oRow, outputObjInspector);
+ }
}
- protected void processMapFunction() throws HiveException
- {
+ protected void processMapFunction() throws HiveException {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
+ while (pItr.hasNext()) {
Object oRow = pItr.next();
forward(oRow, outputObjInspector);
}
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
@Override
- public OperatorType getType()
- {
+ public OperatorType getType() {
return OperatorType.PTF;
}
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
private PTFPartition executeChain(PTFPartition part)
- throws HiveException
- {
+ throws HiveException {
Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
PTFInputDef iDef = conf.getFuncDef();
- while (true)
- {
- if (iDef instanceof PartitionedTableFunctionDef)
- {
- fnDefs.push((PartitionedTableFunctionDef) iDef);
- iDef = ((PartitionedTableFunctionDef) iDef).getInput();
- }
- else
- {
- break;
- }
+
+ while (iDef instanceof PartitionedTableFunctionDef) {
+ fnDefs.push((PartitionedTableFunctionDef) iDef);
+ iDef = ((PartitionedTableFunctionDef) iDef).getInput();
}
PartitionedTableFunctionDef currFnDef;
- while (!fnDefs.isEmpty())
- {
+ while (!fnDefs.isEmpty()) {
currFnDef = fnDefs.pop();
part = currFnDef.getTFunction().execute(part);
}
return part;
}
- /**
- * If WindowingSpec contains any Windowing Expressions or has a
- * Having condition then these are processed
- * and forwarded on. Whereas when there is no having or WdwExprs
- * just forward the rows in the output Partition.
- *
- * For e.g. Consider the following query:
- * <pre>
- * {@code
- * select rank(), lead(rank(),1),...
- * from xyz
- * ...
- * having rank() > 1
- * }
- * </pre>
- * rank() gets processed as a WdwFn; Its in the oPart(output partition)
- * argument to executeWindowExprs. Here we first evaluate the having expression.
- * So the first row of oPart gets filtered out.
- * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
- *
- * @param ptfDesc
- * @param oPart output partition after Window Fns are processed.
- * @param op
- * @throws HiveException
- */
- private void executeWindowExprs(PTFPartition oPart)
- throws HiveException
- {
- WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
- /*
- * inputOI represents the row with WindowFn results present.
- * So in the e.g. above it will have a column for 'rank()'
- */
- StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
- /*
- * outputOI represents the final row with the Windowing Expressions added.
- * So in the e.g. above it will have a column for 'lead(rank())' in addition to
- * all columns in inputOI.
- */
- StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
- int numCols = outputOI.getAllStructFieldRefs().size();
- ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
- int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
- Object[] output = new Object[numCols];
-
- /*
- * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
- * we can just forward the row in the oPart partition.
- */
- boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
- PTFPartitionIterator<Object> pItr = oPart.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
- while (pItr.hasNext())
- {
- int colCnt = 0;
- Object oRow = pItr.next();
-
- /*
- * when there is no Windowing expressions or having;
- * just forward the Object coming out of the Partition.
- */
- if ( forwardRowsUntouched ) {
- forward(oRow, outputObjInspector);
- continue;
- }
-
- /*
- * Setup the output row columns in the following order
- * - the columns in the SelectList processed by the PTF
- * (ie the Select Exprs that have navigation expressions)
- * - the columns from the final PTF.
- */
-
- if ( wdwExprs != null ) {
- for (WindowExpressionDef wdwExpr : wdwExprs)
- {
- Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
- output[colCnt++] = newCol;
- }
- }
-
- for(; colCnt < numCols; ) {
- StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
- output[colCnt++] =
- ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
- field.getFieldObjectInspector());
- }
-
- forward(output, outputObjInspector);
- }
- }
/**
* Create a new Partition.
@@ -390,31 +258,31 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
- HiveConf hiveConf, boolean isMapSide) throws HiveException
- {
+ HiveConf hiveConf, boolean isMapSide) throws HiveException {
PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
TableFunctionEvaluator tEval = tabDef.getTFunction();
- String partClassName = tEval.getPartitionClass();
- int partMemSize = tEval.getPartitionMemSize();
PTFPartition part = null;
SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() :
tabDef.getRawInputShape().getSerde();
- part = new PTFPartition(partClassName, partMemSize, serde,
- (StructObjectInspector) oi);
+ StructObjectInspector outputOI = isMapSide ? tabDef.getInput().getOutputShape().getOI() :
+ tabDef.getRawInputShape().getOI();
+ part = PTFPartition.create(conf.getCfg(),
+ serde,
+ (StructObjectInspector) oi,
+ outputOI);
+
return part;
}
public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
- PTFPartitionIterator<Object> pItr) throws HiveException
- {
+ PTFPartitionIterator<Object> pItr) throws HiveException {
List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
if (llFnDescs == null) {
return;
}
- for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
- {
+ for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
.getGenericUDF();
llFn.setpItr(pItr);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Thu Sep 12 01:21:10 2013
@@ -20,227 +20,166 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.PTFPersistence.ByteBasedList;
+import org.apache.hadoop.hive.ql.exec.persistence.PTFRowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.Writable;
/*
* represents a collection of rows that is acted upon by a TableFunction or a WindowFunction.
*/
-public class PTFPartition
-{
- SerDe serDe;
- StructObjectInspector OI;
- private ByteBasedList elems;
- private Writable wRow;
- private int sz;
-
- public PTFPartition(HiveConf cfg, SerDe serDe, StructObjectInspector oI) throws HiveException
- {
- String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
- int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
- init(partitionClass, partitionMemSize, serDe, oI);
- }
-
- public PTFPartition(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
- {
- init(partitionClass, partitionMemSize, serDe, oI);
- }
+@SuppressWarnings("deprecation")
+public class PTFPartition {
+ protected static Log LOG = LogFactory.getLog(PTFPartition.class);
- private void init(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
- {
+ SerDe serDe;
+ StructObjectInspector inputOI;
+ StructObjectInspector outputOI;
+ private final PTFRowContainer<List<Object>> elems;
+
+ protected PTFPartition(HiveConf cfg,
+ SerDe serDe, StructObjectInspector inputOI,
+ StructObjectInspector outputOI)
+ throws HiveException {
this.serDe = serDe;
- OI = oI;
- elems = PTFPersistence.createList(partitionClass, partitionMemSize);
- sz = 0;
- wRow = createWritable();
+ this.inputOI = inputOI;
+ this.outputOI = outputOI;
+ int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+ elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
+ elems.setSerDe(serDe, outputOI);
+ elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
}
public void reset() throws HiveException {
- sz = 0;
- elems.reset(0);
+ elems.clear();
}
- public SerDe getSerDe()
- {
+ public SerDe getSerDe() {
return serDe;
}
- public void setSerDe(SerDe serDe)
- {
- this.serDe = serDe;
- }
- public StructObjectInspector getOI()
- {
- return OI;
- }
- public void setOI(StructObjectInspector oI)
- {
- OI = oI;
- }
- private Writable createWritable() throws HiveException
- {
- try
- {
- return serDe.getSerializedClass().newInstance();
- }
- catch(Throwable t)
- {
- throw new HiveException(t);
- }
+ public StructObjectInspector getInputOI() {
+ return inputOI;
}
- public Object getAt(int i) throws HiveException
- {
- try
- {
- elems.get(i, wRow);
- Object o = serDe.deserialize(wRow);
- return o;
- }
- catch(SerDeException se)
- {
- throw new HiveException(se);
- }
+ public StructObjectInspector getOutputOI() {
+ return outputOI;
}
- public Object getWritableAt(int i) throws HiveException
+ public Object getAt(int i) throws HiveException
{
- elems.get(i, wRow);
- return wRow;
+ return elems.getAt(i);
}
- public void append(Writable o) throws HiveException
- {
- elems.append(o);
- sz++;
- }
+ public void append(Object o) throws HiveException {
- public void append(Object o) throws HiveException
- {
- try
- {
- append(serDe.serialize(o, OI));
- }
- catch(SerDeException e)
- {
- throw new HiveException(e);
+ if ( elems.size() == Integer.MAX_VALUE ) {
+ throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
+ Integer.MAX_VALUE));
}
+
+ @SuppressWarnings("unchecked")
+ List<Object> l = (List<Object>)
+ ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
+ elems.add(l);
}
- public int size()
- {
- return sz;
+ public int size() {
+ return (int) elems.size();
}
- public PTFPartitionIterator<Object> iterator()
- {
+ public PTFPartitionIterator<Object> iterator() throws HiveException {
+ elems.first();
return new PItr(0, size());
}
- public PTFPartitionIterator<Object> range(int start, int end)
- {
- assert(start >= 0);
- assert(end <= size());
- assert(start <= end);
+ public PTFPartitionIterator<Object> range(int start, int end) {
+ assert (start >= 0);
+ assert (end <= size());
+ assert (start <= end);
return new PItr(start, end);
}
public void close() {
- elems.close();
+ try {
+ elems.close();
+ } catch (HiveException e) {
+ LOG.error(e.toString(), e);
+ }
}
- class PItr implements PTFPartitionIterator<Object>
- {
+ class PItr implements PTFPartitionIterator<Object> {
int idx;
final int start;
final int end;
final int createTimeSz;
- PItr(int start, int end)
- {
+ PItr(int start, int end) {
this.idx = start;
this.start = start;
this.end = end;
createTimeSz = PTFPartition.this.size();
}
- public boolean hasNext()
- {
- checkForComodification() ;
+ public boolean hasNext() {
+ checkForComodification();
return idx < end;
}
- public Object next()
- {
+ public Object next() {
checkForComodification();
- try
- {
+ try {
return PTFPartition.this.getAt(idx++);
- }
- catch(HiveException e)
- {
+ } catch (HiveException e) {
throw new RuntimeException(e);
}
}
- public void remove()
- {
+ public void remove() {
throw new UnsupportedOperationException();
}
- final void checkForComodification()
- {
- if (createTimeSz != PTFPartition.this.size()) {
- throw new ConcurrentModificationException();
- }
+ final void checkForComodification() {
+ if (createTimeSz != PTFPartition.this.size()) {
+ throw new ConcurrentModificationException();
+ }
}
@Override
- public int getIndex()
- {
+ public int getIndex() {
return idx;
}
- private Object getAt(int i)
- {
- try
- {
- return PTFPartition.this.getAt(i);
- }
- catch(HiveException e)
- {
- throw new RuntimeException(e);
- }
+ private Object getAt(int i) throws HiveException {
+ return PTFPartition.this.getAt(i);
}
@Override
- public Object lead(int amt)
- {
+ public Object lead(int amt) throws HiveException {
int i = idx + amt;
i = i >= end ? end - 1 : i;
return getAt(i);
}
@Override
- public Object lag(int amt)
- {
+ public Object lag(int amt) throws HiveException {
int i = idx - amt;
i = i < start ? start : i;
return getAt(i);
}
@Override
- public Object resetToIndex(int idx)
- {
- if ( idx < start || idx >= end )
- {
+ public Object resetToIndex(int idx) throws HiveException {
+ if (idx < start || idx >= end) {
return null;
}
Object o = getAt(idx);
@@ -249,14 +188,12 @@ public class PTFPartition
}
@Override
- public PTFPartition getPartition()
- {
+ public PTFPartition getPartition() {
return PTFPartition.this;
}
@Override
- public void reset()
- {
+ public void reset() {
idx = start;
}
};
@@ -266,24 +203,37 @@ public class PTFPartition
* Iterator exposes the index of the next location.
* Client can invoke lead/lag relative to the next location.
*/
- public static interface PTFPartitionIterator<T> extends Iterator<T>
- {
+ public static interface PTFPartitionIterator<T> extends Iterator<T> {
int getIndex();
- T lead(int amt);
+ T lead(int amt) throws HiveException;
- T lag(int amt);
+ T lag(int amt) throws HiveException;
/*
- * after a lead and lag call, allow Object associated with SerDe and writable associated with partition to be reset
+ * after a lead and lag call, allow Object associated with SerDe and writable associated with
+ * partition to be reset
* to the value for the current Index.
*/
- Object resetToIndex(int idx);
+ Object resetToIndex(int idx) throws HiveException;
PTFPartition getPartition();
- void reset();
+ void reset() throws HiveException;
}
+ public static PTFPartition create(HiveConf cfg,
+ SerDe serDe,
+ StructObjectInspector inputOI,
+ StructObjectInspector outputOI)
+ throws HiveException {
+ return new PTFPartition(cfg, serDe, inputOI, outputOI);
+ }
+
+ public static StructObjectInspector setupPartitionOutputOI(SerDe serDe,
+ StructObjectInspector tblFnOI) throws SerDeException {
+ return (StructObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(tblFnOI,
+ ObjectInspectorCopyOption.WRITABLE);
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Thu Sep 12 01:21:10 2013
@@ -20,6 +20,13 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,13 +43,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-
public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
public static final Comparator<byte[]> C = new Comparator<byte[]>() {
@@ -51,7 +51,7 @@ public class PartitionKeySampler impleme
}
};
- private List<byte[]> sampled = new ArrayList<byte[]>();
+ private final List<byte[]> sampled = new ArrayList<byte[]>();
public void addSampleFile(Path inputPath, JobConf job) throws IOException {
FileSystem fs = inputPath.getFileSystem(job);
@@ -134,6 +134,8 @@ public class PartitionKeySampler impleme
private float samplePercent = 0.1f;
private final Random random = new Random();
+ private int sampled;
+
public FetchSampler(FetchWork work, JobConf job, Operator<?> operator) {
super(work, job, operator, null);
}
@@ -148,12 +150,22 @@ public class PartitionKeySampler impleme
@Override
public boolean pushRow() throws IOException, HiveException {
- InspectableObject row = getNextRow();
- if (row != null && random.nextFloat() < samplePercent) {
- sampleNum--;
- pushRow(row);
+ if (!super.pushRow()) {
+ return false;
+ }
+ if (sampled < sampleNum) {
+ return true;
+ }
+ operator.flush();
+ return false;
+ }
+
+ @Override
+ protected void pushRow(InspectableObject row) throws HiveException {
+ if (random.nextFloat() < samplePercent) {
+ sampled++;
+ super.pushRow(row);
}
- return sampleNum > 0 && row != null;
}
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Sep 12 01:21:10 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -43,14 +42,14 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
- implements Serializable {
+ implements Serializable, TopNHash.BinaryCollector {
private static final long serialVersionUID = 1L;
@@ -90,6 +89,9 @@ public class ReduceSinkOperator extends
return inputAlias;
}
+ // picks topN K:V pairs from input. can be null
+ private transient TopNHash reducerHash;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -131,6 +133,8 @@ public class ReduceSinkOperator extends
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
+ reducerHash = createTopKHash();
+
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
@@ -139,6 +143,23 @@ public class ReduceSinkOperator extends
}
}
+ private TopNHash createTopKHash() {
+ int limit = conf.getTopN();
+ float percent = conf.getTopNMemoryUsage();
+ if (limit < 0 || percent <= 0) {
+ return null;
+ }
+ if (limit == 0) {
+ return TopNHash.create0();
+ }
+ // limit * 64 : compensation of arrays for key/value/hashcodes
+ long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+ if (threshold < 0) {
+ return null;
+ }
+ return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+ }
+
transient InspectableObject tempInspectableObject = new InspectableObject();
protected transient HiveKey keyWritable = new HiveKey();
protected transient Writable value;
@@ -147,12 +168,24 @@ public class ReduceSinkOperator extends
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
- protected transient Object[][] cachedKeys;
protected transient Object[] cachedValues;
protected transient List<List<Integer>> distinctColIndices;
-
+ /**
+ * This two dimensional array holds key data and a corresponding Union object
+ * which contains the tag identifying the aggregate expression for distinct columns.
+ *
+ * If there is no distict expression, cachedKeys is simply like this.
+ * cachedKeys[0] = [col0][col1]
+ *
+ * with two distict expression, union(tag:key) is attatched for each distinct expression
+ * cachedKeys[0] = [col0][col1][0:dist1]
+ * cachedKeys[1] = [col0][col1][1:dist2]
+ *
+ * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+ * see {@link ExprNodeColumnEvaluator}
+ */
+ protected transient Object[][] cachedKeys;
boolean firstRow;
-
protected transient Random random;
/**
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends
}
@Override
+ @SuppressWarnings("unchecked")
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
}
- // Serialize the value
- value = valueSerializer.serialize(cachedValues, valueObjectInspector);
// Evaluate the keys
Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends
// no distinct key
System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
}
+
+ BytesWritable value = null;
// Serialize the keys and append the tag
for (int i = 0; i < cachedKeys.length; i++) {
if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends
}
}
keyWritable.setHashCode(keyHashCode);
- if (out != null) {
- out.collect(keyWritable, value);
- // Since this is a terminal operator, update counters explicitly -
- // forward is not called
- if (counterNameToEnum != null) {
- ++outputRows;
- if (outputRows % 1000 == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
+
+ if (reducerHash == null) {
+ if (null != out) {
+ collect(keyWritable, value = getValue(row, value));
+ }
+ } else {
+ int index = reducerHash.indexOf(keyWritable);
+ if (index == TopNHash.EXCLUDED) {
+ continue;
+ }
+ value = getValue(row, value);
+ if (index >= 0) {
+ reducerHash.set(index, value);
+ } else {
+ if (index == TopNHash.FORWARD) {
+ collect(keyWritable, value);
+ } else if (index == TopNHash.FLUSH) {
+ LOG.info("Top-N hash is flushed");
+ reducerHash.flush();
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ collect(keyWritable, value);
+ } else if (index == TopNHash.DISABLE) {
+ LOG.info("Top-N hash is disabled");
+ reducerHash.flush();
+ collect(keyWritable, value);
+ reducerHash = null;
}
}
}
}
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (IOException e) {
+ } catch (HiveException e) {
+ throw e;
+ } catch (Exception e) {
throw new HiveException(e);
}
}
+ public void collect(BytesWritable key, BytesWritable value) throws IOException {
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
+ out.collect(key, value);
+ if (++outputRows % 1000 == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ }
+ increaseForward(outputRows);
+ outputRows = 0;
+ }
+ }
+
+ // evaluate value lazily
+ private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+ if (value != null) {
+ return value;
+ }
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ if (!abort && reducerHash != null) {
+ try {
+ reducerHash.flush();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } finally {
+ reducerHash = null;
+ }
+ }
+ reducerHash = null;
+ super.closeOp(abort);
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Sep 12 01:21:10 2013
@@ -51,8 +51,10 @@ public class RowSchema implements Serial
@Override
public String toString() {
StringBuilder sb = new StringBuilder('(');
- for (ColumnInfo col: signature) {
- sb.append(col.toString());
+ if (signature != null) {
+ for (ColumnInfo col : signature) {
+ sb.append(col.toString());
+ }
}
sb.append(')');
return sb.toString();
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -64,10 +64,10 @@ public class SMBMapJoinOperator extends
private MapredLocalWork localWork = null;
private Map<String, MergeQueue> aliasToMergeQueue = Collections.emptyMap();
- transient ArrayList<Object>[] keyWritables;
- transient ArrayList<Object>[] nextKeyWritables;
- RowContainer<ArrayList<Object>>[] nextGroupStorage;
- RowContainer<ArrayList<Object>>[] candidateStorage;
+ transient List<Object>[] keyWritables;
+ transient List<Object>[] nextKeyWritables;
+ RowContainer<List<Object>>[] nextGroupStorage;
+ RowContainer<List<Object>>[] candidateStorage;
transient String[] tagToAlias;
private transient boolean[] fetchDone;
@@ -136,12 +136,12 @@ public class SMBMapJoinOperator extends
}
for (byte pos = 0; pos < order.length; pos++) {
- RowContainer rc = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
nextGroupStorage[pos] = rc;
- RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> candidateRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
@@ -435,7 +435,7 @@ public class SMBMapJoinOperator extends
private void promoteNextGroupToCandidate(Byte t) throws HiveException {
this.keyWritables[t] = this.nextKeyWritables[t];
this.nextKeyWritables[t] = null;
- RowContainer<ArrayList<Object>> oldRowContainer = this.candidateStorage[t];
+ RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
oldRowContainer.clear();
this.candidateStorage[t] = this.nextGroupStorage[t];
this.nextGroupStorage[t] = oldRowContainer;
@@ -479,10 +479,10 @@ public class SMBMapJoinOperator extends
private int[] findSmallestKey() {
int[] result = new int[order.length];
- ArrayList<Object> smallestOne = null;
+ List<Object> smallestOne = null;
for (byte pos = 0; pos < order.length; pos++) {
- ArrayList<Object> key = keyWritables[pos];
+ List<Object> key = keyWritables[pos];
if (key == null) {
continue;
}
@@ -501,7 +501,7 @@ public class SMBMapJoinOperator extends
private boolean processKey(byte alias, ArrayList<Object> key)
throws HiveException {
- ArrayList<Object> keyWritable = keyWritables[alias];
+ List<Object> keyWritable = keyWritables[alias];
if (keyWritable == null) {
//the first group.
keyWritables[alias] = key;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Sep 12 01:21:10 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
public boolean supportUnionRemoveOptimization() {
return true;
}
+
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Thu Sep 12 01:21:10 2013
@@ -19,12 +19,16 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.Serializable;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,12 +38,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -271,8 +275,6 @@ public class StatsTask extends Task<Stat
try {
// Stats setup:
Warehouse wh = new Warehouse(conf);
- FileSystem fileSys;
- FileStatus[] fileStatus;
if (!this.getWork().getNoStatsAggregator()) {
String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -322,16 +324,9 @@ public class StatsTask extends Task<Stat
if (!tableStatsExist && atomic) {
return 0;
}
- Path tablePath = wh.getTablePath(db.getDatabase(table.getDbName()), table.getTableName());
- fileSys = tablePath.getFileSystem(conf);
- fileStatus = Utilities.getFileStatusRecurse(tablePath, 1, fileSys);
-
- tblStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
- long tableSize = 0L;
- for (int i = 0; i < fileStatus.length; i++) {
- tableSize += fileStatus[i].getLen();
- }
- tblStats.setStat(StatsSetupConst.TOTAL_SIZE, tableSize);
+ long[] summary = summary(conf, table);
+ tblStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
+ tblStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
// In case of a non-partitioned table, the key for stats temporary store is "rootDir"
if (statsAggregator != null) {
@@ -403,18 +398,9 @@ public class StatsTask extends Task<Stat
}
}
- fileSys = partn.getPartitionPath().getFileSystem(conf);
- /* consider sub-directory created from list bucketing. */
- int listBucketingDepth = calculateListBucketingDMLDepth(partn);
- fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(),
- (1 + listBucketingDepth), fileSys);
- newPartStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
-
- long partitionSize = 0L;
- for (int i = 0; i < fileStatus.length; i++) {
- partitionSize += fileStatus[i].getLen();
- }
- newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, partitionSize);
+ long[] summary = summary(conf, partn);
+ newPartStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
+ newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
if (hasStats) {
PartitionStatistics oldPartStats = new PartitionStatistics(currentValues);
@@ -478,26 +464,103 @@ public class StatsTask extends Task<Stat
return ret;
}
- /**
- * List bucketing will introduce sub-directories.
- *
- * calculate it here in order to go to the leaf directory
- *
- * so that we can count right number of files.
- *
- * @param partn
- * @return
- */
- private int calculateListBucketingDMLDepth(Partition partn) {
- // list bucketing will introduce more files
- int listBucketingDepth = 0;
- if ((partn.getSkewedColNames() != null) && (partn.getSkewedColNames().size() > 0)
- && (partn.getSkewedColValues() != null) && (partn.getSkewedColValues().size() > 0)
- && (partn.getSkewedColValueLocationMaps() != null)
- && (partn.getSkewedColValueLocationMaps().size() > 0)) {
- listBucketingDepth = partn.getSkewedColNames().size();
+ private long[] summary(HiveConf conf, Partition partn) throws IOException {
+ Path path = partn.getPartitionPath();
+ FileSystem fs = path.getFileSystem(conf);
+ List<String> skewedColNames = partn.getSkewedColNames();
+ if (skewedColNames == null || skewedColNames.isEmpty()) {
+ return summary(fs, path);
+ }
+ List<List<String>> skewColValues = table.getSkewedColValues();
+ if (skewColValues == null || skewColValues.isEmpty()) {
+ return summary(fs, toDefaultLBPath(path));
+ }
+ return summary(fs, path, skewedColNames);
+ }
+
+ private long[] summary(HiveConf conf, Table table) throws IOException {
+ Path path = table.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ List<String> skewedColNames = table.getSkewedColNames();
+ if (skewedColNames == null || skewedColNames.isEmpty()) {
+ return summary(fs, path);
+ }
+ List<List<String>> skewColValues = table.getSkewedColValues();
+ if (skewColValues == null || skewColValues.isEmpty()) {
+ return summary(fs, toDefaultLBPath(path));
+ }
+ return summary(fs, path, table.getSkewedColNames());
+ }
+
+ private Path toDefaultLBPath(Path path) {
+ return new Path(path, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
+ }
+
+ private long[] summary(FileSystem fs, Path path) throws IOException {
+ try {
+ FileStatus status = fs.getFileStatus(path);
+ if (!status.isDir()) {
+ return new long[] {1, status.getLen()};
+ }
+ } catch (FileNotFoundException e) {
+ return new long[] {0, 0};
+ }
+ FileStatus[] children = fs.listStatus(path); // can be null
+ if (children == null) {
+ return new long[] {0, 0};
+ }
+ long numFiles = 0L;
+ long tableSize = 0L;
+ for (FileStatus child : children) {
+ if (!child.isDir()) {
+ tableSize += child.getLen();
+ numFiles++;
+ }
+ }
+ return new long[] {numFiles, tableSize};
+ }
+
+ private Pattern toPattern(List<String> skewCols) {
+ StringBuilder builder = new StringBuilder();
+ for (String skewCol : skewCols) {
+ if (builder.length() > 0) {
+ builder.append(Path.SEPARATOR_CHAR);
+ }
+ builder.append(skewCol).append('=');
+ builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*");
+ }
+ builder.append(Path.SEPARATOR_CHAR);
+ builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*$");
+ return Pattern.compile(builder.toString());
+ }
+
+ private long[] summary(FileSystem fs, Path path, List<String> skewCols) throws IOException {
+ long numFiles = 0L;
+ long tableSize = 0L;
+ Pattern pattern = toPattern(skewCols);
+ for (FileStatus status : Utilities.getFileStatusRecurse(path, skewCols.size() + 1, fs)) {
+ if (status.isDir()) {
+ continue;
+ }
+ String relative = toRelativePath(path, status.getPath());
+ if (relative == null) {
+ continue;
+ }
+ if (relative.startsWith(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME) ||
+ pattern.matcher(relative).matches()) {
+ tableSize += status.getLen();
+ numFiles++;
+ }
+ }
+ return new long[] {numFiles, tableSize};
+ }
+
+ private String toRelativePath(Path path1, Path path2) {
+ URI relative = path1.toUri().relativize(path2.toUri());
+ if (relative == path2.toUri()) {
+ return null;
}
- return listBucketingDepth;
+ return relative.getPath();
}
private boolean existStats(Map<String, String> parameters) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -241,6 +242,7 @@ public class TableScanOperator extends O
// and 2) it will fail some join and union queries if this is added forcibly
// into tableScanDesc
java.util.ArrayList<Integer> neededColumnIDs;
+ List<String> neededColumns;
public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
neededColumnIDs = orign_columns;
@@ -250,6 +252,14 @@ public class TableScanOperator extends O
return neededColumnIDs;
}
+ public void setNeededColumns(List<String> columnNames) {
+ neededColumns = columnNames;
+ }
+
+ public List<String> getNeededColumns() {
+ return neededColumns;
+ }
+
@Override
public OperatorType getType() {
return OperatorType.TABLESCAN;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Thu Sep 12 01:21:10 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.ap
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -66,7 +65,7 @@ public class UDTFOperator extends Operat
// Make an object inspector [] of the arguments to the UDTF
List<? extends StructField> inputFields =
- ((StandardStructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
+ ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
udtfInputOIs = new ObjectInspector[inputFields.size()];
for (int i = 0; i < inputFields.size(); i++) {
@@ -104,7 +103,7 @@ public class UDTFOperator extends Operat
@Override
public void processOp(Object row, int tag) throws HiveException {
// The UDTF expects arguments in an object[]
- StandardStructObjectInspector soi = (StandardStructObjectInspector) inputObjInspectors[tag];
+ StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
List<? extends StructField> fields = soi.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Sep 12 01:21:10 2013
@@ -102,6 +102,8 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -112,6 +114,13 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -165,6 +174,10 @@ import org.apache.hadoop.mapred.Sequence
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
/**
* Utilities.
*
@@ -252,7 +265,7 @@ public final class Utilities {
return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
}
- public static BaseWork getBaseWork(Configuration conf, String name) {
+ private static BaseWork getBaseWork(Configuration conf, String name) {
BaseWork gWork = null;
Path path = null;
try {
@@ -260,16 +273,32 @@ public final class Utilities {
assert path != null;
gWork = gWorkMap.get(path);
if (gWork == null) {
- String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
Path localPath;
- if (jtConf.equals("local")) {
+ if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
localPath = path;
} else {
localPath = new Path(name);
}
InputStream in = new FileInputStream(localPath.toUri().getPath());
- BaseWork ret = deserializeObject(in);
- gWork = ret;
+ if(MAP_PLAN_NAME.equals(name)){
+ if (ExecMapper.class.getName().equals(conf.get("mapred.mapper.class"))){
+ gWork = deserializePlan(in, MapWork.class, conf);
+ } else if(RCFileMergeMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializePlan(in, MergeWork.class, conf);
+ } else if(ColumnTruncateMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
+ } else if(PartialScanMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializePlan(in, PartialScanWork.class,conf);
+ } else {
+ assert false;
+ }
+ } else {
+ if(ExecReducer.class.getName().equals(conf.get("mapred.reducer.class"))) {
+ gWork = deserializePlan(in, ReduceWork.class, conf);
+ } else {
+ assert false;
+ }
+ }
gWorkMap.put(path, gWork);
}
return gWork;
@@ -479,7 +508,7 @@ public final class Utilities {
// use the default file system of the conf
FileSystem fs = planPath.getFileSystem(conf);
FSDataOutputStream out = fs.create(planPath);
- serializeObject(w, out);
+ serializePlan(w, out, conf);
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
@@ -586,11 +615,82 @@ public final class Utilities {
}
}
+ /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between
+ java.sql.Date and java.util.Date while deserializing
+ */
+ private static class SqlDateSerializer extends
+ com.esotericsoftware.kryo.Serializer<java.sql.Date> {
+
+ @Override
+ public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
+ return new java.sql.Date(input.readLong());
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
+ output.writeLong(sqlDate.getTime());
+ }
+
+ }
+
+ /**
+ * Serializes the plan.
+ * @param plan The plan, such as QueryPlan, MapredWork, etc.
+ * @param out The stream to write to.
+ * @param conf to pick which serialization format is desired.
+ */
+ public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.SERIALIZE_PLAN);
+ if(conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo").equals("javaXML")) {
+ serializeObjectByJavaXML(plan, out);
+ } else {
+ serializeObjectByKryo(plan, out);
+ }
+ perfLogger.PerfLogEnd(LOG, PerfLogger.SERIALIZE_PLAN);
+ }
+
+ /**
+ * Deserializes the plan.
+ * @param in The stream to read from.
+ * @return The plan, such as QueryPlan, MapredWork, etc.
+ * @param To know what serialization format plan is in
+ */
+ public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.DESERIALIZE_PLAN);
+ T plan;
+ if(conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo").equals("javaXML")) {
+ plan = deserializeObjectByJavaXML(in);
+ } else {
+ plan = deserializeObjectByKryo(in, planClass);
+ }
+ perfLogger.PerfLogEnd(LOG, PerfLogger.DESERIALIZE_PLAN);
+ return plan;
+ }
+
+ /**
+ * Clones using the powers of XML. Do not use unless necessary.
+ * @param plan The plan.
+ * @return The clone.
+ */
+ public static MapredWork clonePlan(MapredWork plan) {
+ // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.CLONE_PLAN);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Utilities.serializeObjectByJavaXML(plan, baos);
+ MapredWork newPlan = Utilities.deserializeObjectByJavaXML(
+ new ByteArrayInputStream(baos.toByteArray()));
+ perfLogger.PerfLogEnd(LOG, PerfLogger.CLONE_PLAN);
+ return newPlan;
+ }
+
/**
* Serialize the object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
- public static void serializeObject(Object plan, OutputStream out) {
+ private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
e.setExceptionListener(new ExceptionListener() {
public void exceptionThrown(Exception e) {
@@ -613,11 +713,21 @@ public final class Utilities {
}
/**
+ * @param plan Usually of type MapredWork, MapredLocalWork etc.
+ * @param out stream in which serialized plan is written into
+ */
+ private static void serializeObjectByKryo(Object plan, OutputStream out) {
+ Output output = new Output(out);
+ kryo.get().writeObject(output, plan);
+ output.close();
+ }
+
+ /**
* De-serialize an object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
@SuppressWarnings("unchecked")
- public static <T> T deserializeObject(InputStream in) {
+ private static <T> T deserializeObjectByJavaXML(InputStream in) {
XMLDecoder d = null;
try {
d = new XMLDecoder(in, null, null);
@@ -629,6 +739,27 @@ public final class Utilities {
}
}
+ private static <T> T deserializeObjectByKryo(InputStream in, Class<T> clazz ) {
+ Input inp = new Input(in);
+ T t = kryo.get().readObject(inp,clazz);
+ inp.close();
+ return t;
+ }
+
+ // Kryo is not thread-safe,
+ // Also new Kryo() is expensive, so we want to do it just once.
+ private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+
+ @Override
+ protected synchronized Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ kryo.register(java.sql.Date.class, new SqlDateSerializer());
+ return kryo;
+ };
+ };
+
+
public static TableDesc defaultTd;
static {
// by default we expect ^A separated strings
@@ -1802,6 +1933,8 @@ public final class Utilities {
*/
public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
throws IOException {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.INPUT_SUMMARY);
long[] summary = {0, 0, 0};
@@ -1937,6 +2070,7 @@ public final class Utilities {
+ cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
}
+ perfLogger.PerfLogEnd(LOG, PerfLogger.INPUT_SUMMARY);
return new ContentSummary(summary[0], summary[1], summary[2]);
} finally {
HiveInterruptUtils.remove(interrup);
@@ -2090,7 +2224,7 @@ public final class Utilities {
if (columnTypes.length() > 0) {
columnTypes.append(",");
}
- columnTypes.append(colInfo.getType().getTypeName());
+ columnTypes.append(colInfo.getTypeName());
}
String columnTypesString = columnTypes.toString();
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Sep 12 01:21:10 2013
@@ -717,12 +717,12 @@ public class ExecDriver extends Task<Map
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
+ MapredLocalWork plan = Utilities.deserializePlan(pathData, MapredLocalWork.class, conf);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext());
} else {
- MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
+ MapredWork plan = Utilities.deserializePlan(pathData, MapredWork.class, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Thu Sep 12 01:21:10 2013
@@ -35,10 +35,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.session.SessionState;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Thu Sep 12 01:21:10 2013
@@ -39,14 +39,11 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.JobConf;
/**
* Extension of ExecDriver:
* - can optionally spawn a map-reduce task from a separate jvm
@@ -69,7 +66,6 @@ public class MapRedTask extends ExecDriv
private transient ContentSummary inputSummary = null;
private transient boolean runningViaChild = false;
- private transient boolean inputSizeEstimated = false;
private transient long totalInputFileSize;
private transient long totalInputNumFiles;
@@ -79,10 +75,6 @@ public class MapRedTask extends ExecDriv
super();
}
- public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
- throw new RuntimeException("Illegal Constructor call");
- }
-
@Override
public int execute(DriverContext driverContext) {
@@ -181,7 +173,7 @@ public class MapRedTask extends ExecDriv
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeObject(plan, out);
+ Utilities.serializePlan(plan, out, conf);
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
@@ -408,7 +400,7 @@ public class MapRedTask extends ExecDriv
if (inputSummary == null) {
inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
- int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
work.isFinalMapRed());
rWork.setNumReduceTasks(reducers);
console
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Thu Sep 12 01:21:10 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.m
import java.io.File;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
@@ -52,9 +53,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -134,14 +134,13 @@ public class MapredLocalTask extends Tas
String hiveJar = conf.getJar();
String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
- String libJarsOption;
// write out the plan to a local file
Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredLocalWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeObject(plan, out);
+ Utilities.serializePlan(plan, out, conf);
String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
@@ -319,14 +318,13 @@ public class MapredLocalTask extends Tas
long elapsed = currentTime - startTime;
console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
+ Utilities.showTime(elapsed) + " sec.");
- } catch (Throwable e) {
- if (e instanceof OutOfMemoryError
- || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
- // Don't create a new object if we are already out of memory
+ } catch (Throwable throwable) {
+ if (throwable instanceof OutOfMemoryError
+ || (throwable instanceof MapJoinMemoryExhaustionException)) {
+ l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
return 3;
} else {
- l4j.error("Hive Runtime Error: Map local work failed");
- e.printStackTrace();
+ l4j.error("Hive Runtime Error: Map local work failed", throwable);
return 2;
}
}
@@ -336,7 +334,6 @@ public class MapredLocalTask extends Tas
private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
throws Exception {
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- int fetchOpRows = 0;
String alias = entry.getKey();
FetchOperator fetchOp = entry.getValue();
@@ -364,7 +361,6 @@ public class MapredLocalTask extends Tas
forwardOp.close(false);
break;
}
- fetchOpRows++;
forwardOp.process(row.o, 0);
// check if any operator had a fatal error or early exit during
// execution
@@ -425,7 +421,8 @@ public class MapredLocalTask extends Tas
}
}
- private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
+ private void generateDummyHashTable(String alias, String bigBucketFileName)
+ throws HiveException,IOException {
// find the (byte)tag for the map join(HashTableSinkOperator)
Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
@@ -442,8 +439,6 @@ public class MapredLocalTask extends Tas
// generate empty hashtable for this (byte)tag
String tmpURI = this.getWork().getTmpFileURI();
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable =
- new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
String fileName = work.getBucketFileName(bigBucketFileName);
@@ -453,12 +448,14 @@ public class MapredLocalTask extends Tas
console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
Path path = new Path(tmpURIPath);
FileSystem fs = path.getFileSystem(job);
- File file = new File(path.toUri().getPath());
- fs.create(path);
- long fileLength = hashTable.flushMemoryCacheToPersistent(file);
+ ObjectOutputStream out = new ObjectOutputStream(fs.create(path));
+ try {
+ MapJoinTableContainerSerDe.persistDummyTable(out);
+ } finally {
+ out.close();
+ }
console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
- + fileLength);
- hashTable.close();
+ + fs.getFileStatus(path).getLen());
}
private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Thu Sep 12 01:21:10 2013
@@ -20,17 +20,17 @@ package org.apache.hadoop.hive.ql.exec.p
import org.apache.hadoop.hive.ql.metadata.HiveException;
-public abstract class AbstractRowContainer<Row> {
+public abstract class AbstractRowContainer<ROW> {
public AbstractRowContainer() {
}
- public abstract void add(Row t) throws HiveException;
+ public abstract void add(ROW t) throws HiveException;
- public abstract Row first() throws HiveException;
+ public abstract ROW first() throws HiveException;
- public abstract Row next() throws HiveException;
+ public abstract ROW next() throws HiveException;
/**
* Get the number of elements in the RowContainer.
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Thu Sep 12 01:21:10 2013
@@ -18,26 +18,14 @@
package org.apache.hadoop.hive.ql.exec.persistence;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.text.NumberFormat;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
/**
@@ -47,26 +35,17 @@ import org.apache.hadoop.hive.ql.session
* hash table.
*/
-public class HashMapWrapper<K, V> implements Serializable {
+public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
private static final long serialVersionUID = 1L;
- protected Log LOG = LogFactory.getLog(this.getClass().getName());
+ protected static final Log LOG = LogFactory.getLog(HashMapWrapper.class);
// default threshold for using main memory based HashMap
-
+ private static final String THESHOLD_NAME = "threshold";
+ private static final String LOAD_NAME = "load";
private static final int THRESHOLD = 1000000;
private static final float LOADFACTOR = 0.75f;
- private static final float MEMORYUSAGE = 1;
-
- private float maxMemoryUsage;
- private HashMap<K, V> mHash; // main memory HashMap
- protected transient LogHelper console;
-
- private File dumpFile;
- public static MemoryMXBean memoryMXBean;
- private long maxMemory;
- private long currentMemory;
- private NumberFormat num;
+ private HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
/**
* Constructor.
@@ -74,163 +53,53 @@ public class HashMapWrapper<K, V> implem
* @param threshold
* User specified threshold to store new values into persistent storage.
*/
- public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
- maxMemoryUsage = memoryUsage;
- mHash = new HashMap<K, V>(threshold, loadFactor);
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
- LOG.info("maximum memory: " + maxMemory);
- num = NumberFormat.getInstance();
- num.setMinimumFractionDigits(2);
+ public HashMapWrapper(int threshold, float loadFactor) {
+ super(createConstructorMetaData(threshold, loadFactor));
+ mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
+
+ }
+
+ public HashMapWrapper(Map<String, String> metaData) {
+ super(metaData);
+ int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
+ float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
+ mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
}
public HashMapWrapper(int threshold) {
- this(threshold, LOADFACTOR, MEMORYUSAGE);
+ this(threshold, LOADFACTOR);
}
public HashMapWrapper() {
- this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
+ this(THRESHOLD, LOADFACTOR);
}
- public V get(K key) {
+ @Override
+ public MapJoinRowContainer get(MapJoinKey key) {
return mHash.get(key);
}
- public boolean put(K key, V value) throws HiveException {
- // isAbort();
+ @Override
+ public void put(MapJoinKey key, MapJoinRowContainer value) {
mHash.put(key, value);
- return false;
- }
-
-
- public void remove(K key) {
- mHash.remove(key);
- }
-
- /**
- * Flush the main memory hash table into the persistent cache file
- *
- * @return persistent cache file
- */
- public long flushMemoryCacheToPersistent(File file) throws IOException {
- ObjectOutputStream outputStream = null;
- outputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4096));
- outputStream.writeObject(mHash);
- outputStream.flush();
- outputStream.close();
-
- return file.length();
- }
-
- public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException {
- ObjectInputStream inputStream = null;
- inputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(fileName), 4096));
- HashMap<K, V> hashtable = (HashMap<K, V>) inputStream.readObject();
- this.setMHash(hashtable);
-
- inputStream.close();
}
+ @Override
public int size() {
return mHash.size();
}
-
- public Set<K> keySet() {
- return mHash.keySet();
- }
-
-
- /**
- * Close the persistent hash table and clean it up.
- *
- * @throws HiveException
- */
- public void close() throws HiveException {
- mHash.clear();
+ @Override
+ public Set<Entry<MapJoinKey, MapJoinRowContainer>> entrySet() {
+ return mHash.entrySet();
}
-
- public void clear() throws HiveException {
+ @Override
+ public void clear() {
mHash.clear();
}
-
- public int getKeySize() {
- return mHash.size();
- }
-
- public boolean isAbort(long numRows,LogHelper console) {
- int size = mHash.size();
- long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
- double rate = (double) usedMemory / (double) maxMemory;
- console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
- + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
- if (rate > (double) maxMemoryUsage) {
- return true;
- }
- return false;
- }
-
- public void setLOG(Log log) {
- LOG = log;
- }
-
- public HashMap<K, V> getMHash() {
- return mHash;
+ private static Map<String, String> createConstructorMetaData(int threshold, float loadFactor) {
+ Map<String, String> metaData = new HashMap<String, String>();
+ metaData.put(THESHOLD_NAME, String.valueOf(threshold));
+ metaData.put(LOAD_NAME, String.valueOf(loadFactor));
+ return metaData;
}
-
- public void setMHash(HashMap<K, V> hash) {
- mHash = hash;
- }
-
- public LogHelper getConsole() {
- return console;
- }
-
- public void setConsole(LogHelper console) {
- this.console = console;
- }
-
- public File getDumpFile() {
- return dumpFile;
- }
-
- public void setDumpFile(File dumpFile) {
- this.dumpFile = dumpFile;
- }
-
- public static MemoryMXBean getMemoryMXBean() {
- return memoryMXBean;
- }
-
- public static void setMemoryMXBean(MemoryMXBean memoryMXBean) {
- HashMapWrapper.memoryMXBean = memoryMXBean;
- }
-
- public long getMaxMemory() {
- return maxMemory;
- }
-
- public void setMaxMemory(long maxMemory) {
- this.maxMemory = maxMemory;
- }
-
- public long getCurrentMemory() {
- return currentMemory;
- }
-
- public void setCurrentMemory(long currentMemory) {
- this.currentMemory = currentMemory;
- }
-
- public NumberFormat getNum() {
- return num;
- }
-
- public void setNum(NumberFormat num) {
- this.num = num;
- }
-
- public static int getTHRESHOLD() {
- return THRESHOLD;
- }
-
}