You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/29 17:44:42 UTC
svn commit: r1518680 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/ optimizer/ parse/ plan/ udf/ptf/
Author: hashutosh
Date: Thu Aug 29 15:44:42 2013
New Revision: 1518680
URL: http://svn.apache.org/r1518680
Log:
HIVE-4964 : Cleanup PTF code: remove code dealing with non standard sql behavior we had original introduced (Harish Butani via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Aug 29 15:44:42 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
private static final long serialVersionUID = 1L;
PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
* 4. Create input partition to store rows coming from previous operator
*/
@Override
- protected void initializeOp(Configuration jobConf) throws HiveException
- {
+ protected void initializeOp(Configuration jobConf) throws HiveException {
hiveConf = new HiveConf(jobConf, PTFOperator.class);
// if the parent is ExtractOperator, this invocation is from reduce-side
Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
inputPart = createFirstPartitionForChain(
inputObjInspectors[0], hiveConf, isMapOperator);
- if (isMapOperator)
- {
+ if (isMapOperator) {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
outputObjInspector = tDef.getRawInputShape().getOI();
- }
- else
- {
+ } else {
outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
}
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
}
@Override
- protected void closeOp(boolean abort) throws HiveException
- {
+ protected void closeOp(boolean abort) throws HiveException {
super.closeOp(abort);
if(inputPart.size() != 0){
- if (isMapOperator)
- {
+ if (isMapOperator) {
processMapFunction();
- }
- else
- {
+ } else {
processInputPartition();
}
}
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
@Override
public void processOp(Object row, int tag) throws HiveException
{
- if (!isMapOperator )
- {
+ if (!isMapOperator ) {
/*
* checkif current row belongs to the current accumulated Partition:
* - If not:
@@ -126,20 +113,15 @@ public class PTFOperator extends Operato
boolean keysAreEqual = (currentKeys != null && newKeys != null)?
newKeys.equals(currentKeys) : false;
- if (currentKeys != null && !keysAreEqual)
- {
+ if (currentKeys != null && !keysAreEqual) {
processInputPartition();
inputPart.reset();
}
- if (currentKeys == null || !keysAreEqual)
- {
- if (currentKeys == null)
- {
+ if (currentKeys == null || !keysAreEqual) {
+ if (currentKeys == null) {
currentKeys = newKeys.copyKey();
- }
- else
- {
+ } else {
currentKeys.copyKey(newKeys);
}
}
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
* @param hiveConf
* @throws HiveException
*/
- protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
- {
+ protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
PTFDeserializer dS =
new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
dS.initializePTFChain(conf.getFuncDef());
}
- protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
- {
+ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
PartitionDef pDef = conf.getStartOfChain().getPartition();
ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
- for(int i=0; i<numExprs; i++)
- {
+ for(int i=0; i<numExprs; i++) {
PTFExpressionDef exprDef = exprs.get(i);
/*
* Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
newKeys = keyWrapperFactory.getKeyWrapper();
}
- protected void processInputPartition() throws HiveException
- {
+ protected void processInputPartition() throws HiveException {
PTFPartition outPart = executeChain(inputPart);
- if ( conf.forWindowing() ) {
- executeWindowExprs(outPart);
- }
- else {
- PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
- Object oRow = pItr.next();
- forward(oRow, outputObjInspector);
- }
- }
+ PTFPartitionIterator<Object> pItr = outPart.iterator();
+ while (pItr.hasNext()) {
+ Object oRow = pItr.next();
+ forward(oRow, outputObjInspector);
+ }
}
- protected void processMapFunction() throws HiveException
- {
+ protected void processMapFunction() throws HiveException {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
+ while (pItr.hasNext()) {
Object oRow = pItr.next();
forward(oRow, outputObjInspector);
}
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
@Override
- public OperatorType getType()
- {
+ public OperatorType getType() {
return OperatorType.PTF;
}
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
private PTFPartition executeChain(PTFPartition part)
- throws HiveException
- {
+ throws HiveException {
Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
PTFInputDef iDef = conf.getFuncDef();
- while (true)
- {
- if (iDef instanceof PartitionedTableFunctionDef)
- {
- fnDefs.push((PartitionedTableFunctionDef) iDef);
- iDef = ((PartitionedTableFunctionDef) iDef).getInput();
- }
- else
- {
- break;
- }
+
+ while (iDef instanceof PartitionedTableFunctionDef) {
+ fnDefs.push((PartitionedTableFunctionDef) iDef);
+ iDef = ((PartitionedTableFunctionDef) iDef).getInput();
}
PartitionedTableFunctionDef currFnDef;
- while (!fnDefs.isEmpty())
- {
+ while (!fnDefs.isEmpty()) {
currFnDef = fnDefs.pop();
part = currFnDef.getTFunction().execute(part);
}
return part;
}
- /**
- * If WindowingSpec contains any Windowing Expressions or has a
- * Having condition then these are processed
- * and forwarded on. Whereas when there is no having or WdwExprs
- * just forward the rows in the output Partition.
- *
- * For e.g. Consider the following query:
- * <pre>
- * {@code
- * select rank(), lead(rank(),1),...
- * from xyz
- * ...
- * having rank() > 1
- * }
- * </pre>
- * rank() gets processed as a WdwFn; Its in the oPart(output partition)
- * argument to executeWindowExprs. Here we first evaluate the having expression.
- * So the first row of oPart gets filtered out.
- * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
- *
- * @param ptfDesc
- * @param oPart output partition after Window Fns are processed.
- * @param op
- * @throws HiveException
- */
- private void executeWindowExprs(PTFPartition oPart)
- throws HiveException
- {
- WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
- /*
- * inputOI represents the row with WindowFn results present.
- * So in the e.g. above it will have a column for 'rank()'
- */
- StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
- /*
- * outputOI represents the final row with the Windowing Expressions added.
- * So in the e.g. above it will have a column for 'lead(rank())' in addition to
- * all columns in inputOI.
- */
- StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
- int numCols = outputOI.getAllStructFieldRefs().size();
- ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
- int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
- Object[] output = new Object[numCols];
-
- /*
- * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
- * we can just forward the row in the oPart partition.
- */
- boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
- PTFPartitionIterator<Object> pItr = oPart.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
- while (pItr.hasNext())
- {
- int colCnt = 0;
- Object oRow = pItr.next();
-
- /*
- * when there is no Windowing expressions or having;
- * just forward the Object coming out of the Partition.
- */
- if ( forwardRowsUntouched ) {
- forward(oRow, outputObjInspector);
- continue;
- }
-
- /*
- * Setup the output row columns in the following order
- * - the columns in the SelectList processed by the PTF
- * (ie the Select Exprs that have navigation expressions)
- * - the columns from the final PTF.
- */
-
- if ( wdwExprs != null ) {
- for (WindowExpressionDef wdwExpr : wdwExprs)
- {
- Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
- output[colCnt++] = newCol;
- }
- }
-
- for(; colCnt < numCols; ) {
- StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
- output[colCnt++] =
- ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
- field.getFieldObjectInspector());
- }
-
- forward(output, outputObjInspector);
- }
- }
/**
* Create a new Partition.
@@ -390,8 +258,7 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
- HiveConf hiveConf, boolean isMapSide) throws HiveException
- {
+ HiveConf hiveConf, boolean isMapSide) throws HiveException {
PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
TableFunctionEvaluator tEval = tabDef.getTFunction();
@@ -410,14 +277,12 @@ public class PTFOperator extends Operato
}
public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
- PTFPartitionIterator<Object> pItr) throws HiveException
- {
+ PTFPartitionIterator<Object> pItr) throws HiveException {
List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
if (llFnDescs == null) {
return;
}
- for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
- {
+ for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
.getGenericUDF();
llFn.setpItr(pItr);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 29 15:44:42 2013
@@ -68,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -223,12 +222,6 @@ public final class ColumnPrunerProcFacto
}
}
}
- if ( tDef.getWindowExpressions() != null ) {
- for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
- ExprNodeDesc exprNode = expr.getExprNode();
- Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
- }
- }
if(tDef.getPartition() != null){
for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
ExprNodeDesc exprNode = col.getExprNode();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Thu Aug 29 15:44:42 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.RangeBoundaryDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -200,83 +199,26 @@ public class PTFTranslator {
/*
* set outputFromWdwFnProcessing
*/
- if (windowFunctions.size() > 0) {
- ArrayList<String> aliases = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
- for (WindowFunctionDef wFnDef : windowFunctions) {
- aliases.add(wFnDef.getAlias());
- if (wFnDef.isPivotResult()) {
- fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
- } else {
- fieldOIs.add(wFnDef.getOI());
- }
- }
- PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
- StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- aliases, fieldOIs);
- tFn.setWdwProcessingOutputOI(wdwOutOI);
- RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef, false);
- ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
- wdwTFnDef.setOutputFromWdwFnProcessing(wdwOutShape);
- }
- else {
- wdwTFnDef.setOutputFromWdwFnProcessing(inpShape);
- }
-
- /*
- * process Wdw expressions
- */
- ShapeDetails wdwOutShape = wdwTFnDef.getOutputFromWdwFnProcessing();
- ArrayList<WindowExpressionDef> windowExpressions = new ArrayList<WindowExpressionDef>();
- if (wdwSpec.getWindowExpressions() != null) {
- for (WindowExpressionSpec expr : wdwSpec.getWindowExpressions()) {
- if (!(expr instanceof WindowFunctionSpec)) {
- try {
- PTFExpressionDef eDef = buildExpressionDef(wdwOutShape, expr.getExpression());
- WindowExpressionDef wdwEDef = new WindowExpressionDef(eDef);
- wdwEDef.setAlias(expr.getAlias());
- windowExpressions.add(wdwEDef);
- } catch (HiveException he) {
- throw new SemanticException(he);
- }
- }
+ ArrayList<String> aliases = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ for (WindowFunctionDef wFnDef : windowFunctions) {
+ aliases.add(wFnDef.getAlias());
+ if (wFnDef.isPivotResult()) {
+ fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+ } else {
+ fieldOIs.add(wFnDef.getOI());
}
- wdwTFnDef.setWindowExpressions(windowExpressions);
- }
-
- /*
- * set outputOI
- */
- if (windowExpressions.size() > 0) {
- ArrayList<String> aliases = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
- for (WindowExpressionDef wEDef : windowExpressions) {
- aliases.add(wEDef.getAlias());
- fieldOIs.add(wEDef.getOI());
- }
- PTFTranslator.addInputColumnsToList(wdwOutShape, aliases, fieldOIs);
- StructObjectInspector outOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- aliases, fieldOIs);
- RowResolver outRR = buildRowResolverForWindowing(wdwTFnDef, true);
- ShapeDetails outShape = setupShape(outOI, null, outRR);
- wdwTFnDef.setOutputShape(outShape);
- }
- else {
- wdwTFnDef.setOutputShape(copyShape(wdwOutShape));
}
+ PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
+ StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+ aliases, fieldOIs);
+ tFn.setWdwProcessingOutputOI(wdwOutOI);
+ RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef);
+ ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
+ wdwTFnDef.setOutputShape(wdwOutShape);
tFn.setupOutputOI();
- /*
- * If we have windowExpressions then we convert to Std. Object to process;
- * we just stream these rows; no need to put in an output Partition.
- */
- if (windowExpressions.size() > 0) {
- StructObjectInspector oi = (StructObjectInspector)
- ObjectInspectorUtils.getStandardObjectInspector(wdwTFnDef.getOutputShape().getOI());
- wdwTFnDef.getOutputShape().setOI(oi);
- }
-
return ptfDesc;
}
@@ -949,23 +891,10 @@ public class PTFTranslator {
return rwsch;
}
- protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def,
- boolean addWdwExprs) throws SemanticException {
+ protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def)
+ throws SemanticException {
RowResolver rr = new RowResolver();
HashMap<String, WindowExpressionSpec> aliasToExprMap = windowingSpec.getAliasToWdwExpr();
- /*
- * add Window Expressions
- */
- if (addWdwExprs) {
- for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
- ASTNode ast = aliasToExprMap.get(wEDef.getAlias()).getExpression();
- ColumnInfo cInfo = new ColumnInfo(wEDef.getAlias(),
- TypeInfoUtils.getTypeInfoFromObjectInspector(wEDef.getOI()),
- null,
- false);
- rr.putExpression(ast, cInfo);
- }
- }
/*
* add Window Functions
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Thu Aug 29 15:44:42 2013
@@ -67,19 +67,6 @@ public class WindowingSpec {
windowSpecs.put(name, wdwSpec);
}
- public void addExpression(ASTNode expr, String alias) {
- windowExpressions = windowExpressions == null ?
- new ArrayList<WindowExpressionSpec>() : windowExpressions;
- aliasToWdwExpr = aliasToWdwExpr == null ?
- new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
- WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
- wExprSpec.setAlias(alias);
- wExprSpec.setExpression(expr);
-
- windowExpressions.add(wExprSpec);
- aliasToWdwExpr.put(alias, wExprSpec);
- }
-
public void addWindowFunction(WindowFunctionSpec wFn) {
windowExpressions = windowExpressions == null ?
new ArrayList<WindowExpressionSpec>() : windowExpressions;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java Thu Aug 29 15:44:42 2013
@@ -239,28 +239,8 @@ public class PTFDesc extends AbstractOpe
}
public static class WindowTableFunctionDef extends PartitionedTableFunctionDef {
- ArrayList<WindowExpressionDef> windowExpressions;
ArrayList<WindowFunctionDef> windowFunctions;
- /*
- * this shape omits the non WdwFunction Expressions. Expr Evaluators for the Window Expressions is based on this
- * shape, so they can refer to the Wdw Function values.
- * @note: this will eventually be removed, as plan is to push Wdw expression processing to separate Select Op after
- * PTF Op.
- */
- ShapeDetails outputFromWdwFnProcessing;
-
- public ArrayList<WindowExpressionDef> getWindowExpressions() {
- return windowExpressions;
- }
- public void setWindowExpressions(ArrayList<WindowExpressionDef> windowExpressions) {
- this.windowExpressions = windowExpressions;
- }
- public ShapeDetails getOutputFromWdwFnProcessing() {
- return outputFromWdwFnProcessing;
- }
- public void setOutputFromWdwFnProcessing(ShapeDetails outputFromWdwFnProcessing) {
- this.outputFromWdwFnProcessing = outputFromWdwFnProcessing;
- }
+
public ArrayList<WindowFunctionDef> getWindowFunctions() {
return windowFunctions;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Thu Aug 29 15:44:42 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -53,7 +52,6 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -114,67 +112,37 @@ public class PTFDeserializer {
/*
* 2. initialize WFns.
*/
- if (def.getWindowFunctions() != null) {
- for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
-
- if (wFnDef.getArgs() != null) {
- for (PTFExpressionDef arg : wFnDef.getArgs()) {
- initialize(arg, inpShape);
- }
+ for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
+ if (wFnDef.getArgs() != null) {
+ for (PTFExpressionDef arg : wFnDef.getArgs()) {
+ initialize(arg, inpShape);
}
- if (wFnDef.getWindowFrame() != null) {
- WindowFrameDef wFrmDef = wFnDef.getWindowFrame();
- initialize(wFrmDef.getStart(), inpShape);
- initialize(wFrmDef.getEnd(), inpShape);
- }
- setupWdwFnEvaluator(wFnDef);
}
- ArrayList<String> aliases = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
- for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
- aliases.add(wFnDef.getAlias());
- if (wFnDef.isPivotResult()) {
- fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
- } else {
- fieldOIs.add(wFnDef.getOI());
- }
+ if (wFnDef.getWindowFrame() != null) {
+ WindowFrameDef wFrmDef = wFnDef.getWindowFrame();
+ initialize(wFrmDef.getStart(), inpShape);
+ initialize(wFrmDef.getEnd(), inpShape);
}
- PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs);
- StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- aliases, fieldOIs);
- tResolver.setWdwProcessingOutputOI(wdwOutOI);
- initialize(def.getOutputFromWdwFnProcessing(), wdwOutOI);
- } else {
- def.setOutputFromWdwFnProcessing(inpShape);
+ setupWdwFnEvaluator(wFnDef);
}
-
- inpShape = def.getOutputFromWdwFnProcessing();
-
- /*
- * 3. initialize WExprs. + having clause
- */
- if (def.getWindowExpressions() != null) {
- for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
- initialize(wEDef, inpShape);
+ ArrayList<String> aliases = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
+ aliases.add(wFnDef.getAlias());
+ if (wFnDef.isPivotResult()) {
+ fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+ } else {
+ fieldOIs.add(wFnDef.getOI());
}
}
- /*
- * 4. give Evaluator chance to setup for Output execution; setup Output shape.
- */
+ PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs);
+ StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+ aliases, fieldOIs);
+ tResolver.setWdwProcessingOutputOI(wdwOutOI);
+ initialize(def.getOutputShape(), wdwOutOI);
tResolver.initializeOutputOI();
- initialize(def.getOutputShape(), tEval.getOutputOI());
-
- /*
- * If we have windowExpressions then we convert to Std. Object to process;
- * we just stream these rows; no need to put in an output Partition.
- */
- if (def.getWindowExpressions().size() > 0) {
- StructObjectInspector oi = (StructObjectInspector)
- ObjectInspectorUtils.getStandardObjectInspector(def.getOutputShape().getOI());
- def.getOutputShape().setOI(oi);
- }
}
protected void initialize(PTFQueryInputDef def, StructObjectInspector OI) throws HiveException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Thu Aug 29 15:44:42 2013
@@ -46,34 +46,12 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
-public class WindowingTableFunction extends TableFunctionEvaluator
-{
-
- @Override
- public PTFPartition execute(PTFPartition iPart)
- throws HiveException
- {
- WindowTableFunctionDef wFnDef = (WindowTableFunctionDef) getTableDef();
- PTFPartitionIterator<Object> pItr = iPart.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr);
-
- if ( outputPartition == null ) {
- outputPartition = PTFPartition.create(ptfDesc.getCfg(),
- wFnDef.getOutputFromWdwFnProcessing().getSerde(),
- OI, wFnDef.getOutputFromWdwFnProcessing().getOI());
- }
- else {
- outputPartition.reset();
- }
-
- execute(pItr, outputPartition);
- return outputPartition;
- }
+@SuppressWarnings("deprecation")
+public class WindowingTableFunction extends TableFunctionEvaluator {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException
- {
+ public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
PTFPartition iPart = pItr.getPartition();
StructObjectInspector inputOI;
@@ -82,36 +60,29 @@ public class WindowingTableFunction exte
WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
Order order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
- for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions())
- {
+ for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
boolean processWindow = processWindow(wFn);
pItr.reset();
- if ( !processWindow )
- {
+ if ( !processWindow ) {
GenericUDAFEvaluator fEval = wFn.getWFnEval();
Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
- while(pItr.hasNext())
- {
+ while(pItr.hasNext()) {
Object row = pItr.next();
int i =0;
if ( wFn.getArgs() != null ) {
- for(PTFExpressionDef arg : wFn.getArgs())
- {
+ for(PTFExpressionDef arg : wFn.getArgs()) {
args[i++] = arg.getExprEvaluator().evaluate(row);
}
}
fEval.aggregate(aggBuffer, args);
}
Object out = fEval.evaluate(aggBuffer);
- if ( !wFn.isPivotResult())
- {
+ if ( !wFn.isPivotResult()) {
out = new SameList(iPart.size(), out);
}
oColumns.add((List<?>)out);
- }
- else
- {
+ } else {
oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart, order));
}
}
@@ -122,18 +93,15 @@ public class WindowingTableFunction exte
* - the input Rows columns
*/
- for(int i=0; i < iPart.size(); i++)
- {
+ for(int i=0; i < iPart.size(); i++) {
ArrayList oRow = new ArrayList();
Object iRow = iPart.getAt(i);
- for(int j=0; j < oColumns.size(); j++)
- {
+ for(int j=0; j < oColumns.size(); j++) {
oRow.add(oColumns.get(j).get(i));
}
- for(StructField f : inputOI.getAllStructFieldRefs())
- {
+ for(StructField f : inputOI.getAllStructFieldRefs()) {
oRow.add(inputOI.getStructFieldData(iRow, f));
}
@@ -187,15 +155,13 @@ public class WindowingTableFunction exte
* - the Window Functions.
*/
@Override
- public void initializeOutputOI() throws HiveException
- {
+ public void initializeOutputOI() throws HiveException {
setupOutputOI();
}
@Override
- public boolean transformsRawInput()
- {
+ public boolean transformsRawInput() {
return false;
}
@@ -225,26 +191,22 @@ public class WindowingTableFunction exte
WindowFunctionDef wFnDef,
PTFPartition iPart,
Order order)
- throws HiveException
- {
+ throws HiveException {
ArrayList<Object> vals = new ArrayList<Object>();
GenericUDAFEvaluator fEval = wFnDef.getWFnEval();
Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()];
- for(int i=0; i < iPart.size(); i++)
- {
+ for(int i=0; i < iPart.size(); i++) {
AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
Range rng = getRange(wFnDef, i, iPart, order);
PTFPartitionIterator<Object> rItr = rng.iterator();
PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- while(rItr.hasNext())
- {
+ while(rItr.hasNext()) {
Object row = rItr.next();
int j = 0;
if ( wFnDef.getArgs() != null ) {
- for(PTFExpressionDef arg : wFnDef.getArgs())
- {
+ for(PTFExpressionDef arg : wFnDef.getArgs()) {
args[j++] = arg.getExprEvaluator().evaluate(row);
}
}
@@ -701,8 +663,7 @@ public class WindowingTableFunction exte
}
}
- public Object computeValue(Object row) throws HiveException
- {
+ public Object computeValue(Object row) throws HiveException {
Object o = expressionDef.getExprEvaluator().evaluate(row);
return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI());
}
@@ -713,11 +674,10 @@ public class WindowingTableFunction exte
@SuppressWarnings("incomplete-switch")
- public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order) throws HiveException
- {
+ public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order)
+ throws HiveException {
PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) vbDef.getOI();
- switch(pOI.getPrimitiveCategory())
- {
+ switch(pOI.getPrimitiveCategory()) {
case BYTE:
case INT:
case LONG:
@@ -736,16 +696,14 @@ public class WindowingTableFunction exte
}
}
- public static class LongValueBoundaryScanner extends ValueBoundaryScanner
- {
- public LongValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
- {
+ public static class LongValueBoundaryScanner extends ValueBoundaryScanner {
+ public LongValueBoundaryScanner(BoundaryDef bndDef, Order order,
+ PTFExpressionDef expressionDef) {
super(bndDef,order,expressionDef);
}
@Override
- public boolean isGreater(Object v1, Object v2, int amt)
- {
+ public boolean isGreater(Object v1, Object v2, int amt) {
long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
@@ -754,8 +712,7 @@ public class WindowingTableFunction exte
}
@Override
- public boolean isEqual(Object v1, Object v2)
- {
+ public boolean isEqual(Object v1, Object v2) {
long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
@@ -764,16 +721,14 @@ public class WindowingTableFunction exte
}
}
- public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner
- {
- public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
- {
+ public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner {
+ public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order,
+ PTFExpressionDef expressionDef) {
super(bndDef,order,expressionDef);
}
@Override
- public boolean isGreater(Object v1, Object v2, int amt)
- {
+ public boolean isGreater(Object v1, Object v2, int amt) {
double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
@@ -782,8 +737,7 @@ public class WindowingTableFunction exte
}
@Override
- public boolean isEqual(Object v1, Object v2)
- {
+ public boolean isEqual(Object v1, Object v2) {
double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
@@ -792,16 +746,14 @@ public class WindowingTableFunction exte
}
}
- public static class StringValueBoundaryScanner extends ValueBoundaryScanner
- {
- public StringValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
- {
+ public static class StringValueBoundaryScanner extends ValueBoundaryScanner {
+ public StringValueBoundaryScanner(BoundaryDef bndDef, Order order,
+ PTFExpressionDef expressionDef) {
super(bndDef,order,expressionDef);
}
@Override
- public boolean isGreater(Object v1, Object v2, int amt)
- {
+ public boolean isGreater(Object v1, Object v2, int amt) {
String s1 = PrimitiveObjectInspectorUtils.getString(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
String s2 = PrimitiveObjectInspectorUtils.getString(v2,
@@ -810,8 +762,7 @@ public class WindowingTableFunction exte
}
@Override
- public boolean isEqual(Object v1, Object v2)
- {
+ public boolean isEqual(Object v1, Object v2) {
String s1 = PrimitiveObjectInspectorUtils.getString(v1,
(PrimitiveObjectInspector) expressionDef.getOI());
String s2 = PrimitiveObjectInspectorUtils.getString(v2,
@@ -820,26 +771,22 @@ public class WindowingTableFunction exte
}
}
- public static class SameList<E> extends AbstractList<E>
- {
+ public static class SameList<E> extends AbstractList<E> {
int sz;
E val;
- public SameList(int sz, E val)
- {
+ public SameList(int sz, E val) {
this.sz = sz;
this.val = val;
}
@Override
- public E get(int index)
- {
+ public E get(int index) {
return val;
}
@Override
- public int size()
- {
+ public int size() {
return sz;
}