You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/04/29 16:54:17 UTC
svn commit: r1590999 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/PTFOperator.java parse/PTFTranslator.java plan/PTFDeserializer.java
udf/ptf/TableFunctionEvaluator.java udf/ptf/WindowingTableFunction.java
Author: hashutosh
Date: Tue Apr 29 14:54:16 2014
New Revision: 1590999
URL: http://svn.apache.org/r1590999
Log:
HIVE-4965 : Add support so that PTFs can stream their output; Windowing PTF should do this (Harish Butani via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Apr 29 14:54:16 2014
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.Iterator;
import java.util.List;
+import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -179,8 +181,8 @@ public class PTFOperator extends Operato
}
protected void processInputPartition() throws HiveException {
- PTFPartition outPart = executeChain(inputPart);
- PTFPartitionIterator<Object> pItr = outPart.iterator();
+ Iterator<Object> pItr = executeChain(inputPart);
+
while (pItr.hasNext()) {
Object oRow = pItr.next();
forward(oRow, outputObjInspector);
@@ -189,8 +191,11 @@ public class PTFOperator extends Operato
protected void processMapFunction() throws HiveException {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
- PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
- PTFPartitionIterator<Object> pItr = outPart.iterator();
+
+ Iterator<Object> pItr = tDef.getTFunction().canIterateOutput() ?
+ tDef.getTFunction().transformRawInputIterator(inputPart.iterator()) :
+ tDef.getTFunction().transformRawInput(inputPart).iterator();
+
while (pItr.hasNext()) {
Object oRow = pItr.next();
forward(oRow, outputObjInspector);
@@ -225,9 +230,9 @@ public class PTFOperator extends Operato
* @return
* @throws HiveException
*/
- private PTFPartition executeChain(PTFPartition part)
+ private Iterator<Object> executeChain(PTFPartition part)
throws HiveException {
- Deque<PartitionedTableFunctionDef> fnDefs = new ArrayDeque<PartitionedTableFunctionDef>();
+ Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
PTFInputDef iDef = conf.getFuncDef();
while (iDef instanceof PartitionedTableFunctionDef) {
@@ -236,11 +241,21 @@ public class PTFOperator extends Operato
}
PartitionedTableFunctionDef currFnDef;
- while (!fnDefs.isEmpty()) {
+ int i = fnDefs.size();
+ while (i > 1) {
currFnDef = fnDefs.pop();
part = currFnDef.getTFunction().execute(part);
+ i--;
}
- return part;
+
+ currFnDef = fnDefs.pop();
+ if (!currFnDef.getTFunction().canIterateOutput()) {
+ part = currFnDef.getTFunction().execute(part);
+ return part.iterator();
+ } else {
+ return currFnDef.getTFunction().iterator(part.iterator());
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Tue Apr 29 14:54:16 2014
@@ -137,6 +137,7 @@ public class PTFTranslator {
ptfDesc.setCfg(hCfg);
ptfDesc.setLlInfo(llInfo);
translatePTFChain();
+ PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
return ptfDesc;
}
@@ -222,6 +223,8 @@ public class PTFTranslator {
tFn.setupOutputOI();
+ PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
+
return ptfDesc;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Tue Apr 29 14:54:16 2014
@@ -95,6 +95,8 @@ public class PTFDeserializer {
initialize((PartitionedTableFunctionDef) currentDef);
}
}
+
+ PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
}
public void initializeWindowing(WindowTableFunctionDef def) throws HiveException {
@@ -331,4 +333,17 @@ public class PTFDeserializer {
{fnames, fields};
}
+ /*
+ * If the final PTF in a PTFChain can stream its output, then set the OI of its OutputShape
+ * to the OI returned by the TableFunctionEvaluator.
+ */
+ public static void alterOutputOIForStreaming(PTFDesc ptfDesc) {
+ PartitionedTableFunctionDef tDef = ptfDesc.getFuncDef();
+ TableFunctionEvaluator tEval = tDef.getTFunction();
+
+ if ( tEval.canIterateOutput() ) {
+ tDef.getOutputShape().setOI(tEval.getOutputOI());
+ }
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Tue Apr 29 14:54:16 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.udf.ptf;
+import java.util.Iterator;
+
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -138,6 +140,37 @@ public abstract class TableFunctionEvalu
return null;
}
+
+ /*
+ * A TableFunction may be able to provide its Output as an Iterator.
+ * In case it can then for Map-side processing and for the last PTF in a Reduce-side chain
+ * we can forward rows one by one. This will save the time/space to populate and read an Output
+ * Partition.
+ */
+ public boolean canIterateOutput() {
+ return false;
+ }
+
+ public Iterator<Object> iterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+ if (!canIterateOutput()) {
+ throw new HiveException(
+ "Internal error: iterator called on a PTF that cannot provide its output as an Iterator");
+ }
+ throw new HiveException(String.format(
+ "Internal error: PTF %s, provides no iterator method",
+ getClass().getName()));
+ }
+
+ public Iterator<Object> transformRawInputIterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+ if (!canIterateOutput()) {
+ throw new HiveException(
+ "Internal error: iterator called on a PTF that cannot provide its output as an Iterator");
+ }
+ throw new HiveException(String.format(
+ "Internal error: PTF %s, provides no iterator method",
+ getClass().getName()));
+ }
+
public void close() {
if (outputPartition != null) {
outputPartition.close();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Tue Apr 29 14:54:16 2014
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.udf.pt
import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -64,20 +66,7 @@ public class WindowingTableFunction exte
boolean processWindow = processWindow(wFn);
pItr.reset();
if ( !processWindow ) {
- GenericUDAFEvaluator fEval = wFn.getWFnEval();
- Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
- AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
- while(pItr.hasNext()) {
- Object row = pItr.next();
- int i =0;
- if ( wFn.getArgs() != null ) {
- for(PTFExpressionDef arg : wFn.getArgs()) {
- args[i++] = arg.getExprEvaluator().evaluate(row);
- }
- }
- fEval.aggregate(aggBuffer, args);
- }
- Object out = fEval.evaluate(aggBuffer);
+ Object out = evaluateWindowFunction(wFn, pItr);
if ( !wFn.isPivotResult()) {
out = new SameList(iPart.size(), out);
}
@@ -109,6 +98,28 @@ public class WindowingTableFunction exte
}
}
+ Object evaluateWindowFunction(WindowFunctionDef wFn,
+ PTFPartitionIterator<Object> pItr) throws HiveException {
+ GenericUDAFEvaluator fEval = wFn.getWFnEval();
+ Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+ AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
+ while(pItr.hasNext())
+ {
+ Object row = pItr.next();
+ int i =0;
+ if ( wFn.getArgs() != null ) {
+ for(PTFExpressionDef arg : wFn.getArgs())
+ {
+ args[i++] = arg.getExprEvaluator().evaluate(row);
+ }
+ }
+ fEval.aggregate(aggBuffer, args);
+ }
+ Object out = fEval.evaluate(aggBuffer);
+ out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
+ return out;
+ }
+
private boolean processWindow(WindowFunctionDef wFn) {
WindowFrameDef frame = wFn.getWindowFrame();
if ( frame == null ) {
@@ -121,6 +132,54 @@ public class WindowingTableFunction exte
return true;
}
+ @Override
+ public boolean canIterateOutput() {
+ return true;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Iterator<Object> iterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+ WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
+ ArrayList<Object> output = new ArrayList<Object>();
+ List<?>[] outputFromPivotFunctions = new List<?>[wTFnDef.getWindowFunctions().size()];
+ ArrayList<Integer> wFnsWithWindows = new ArrayList<Integer>();
+ PTFPartition iPart = pItr.getPartition();
+
+ int i=0;
+ for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
+ boolean processWindow = processWindow(wFn);
+ pItr.reset();
+ if ( !processWindow && !wFn.isPivotResult() ) {
+ Object out = evaluateWindowFunction(wFn, pItr);
+ output.add(out);
+ } else if (wFn.isPivotResult()) {
+ /*
+ * for functions that currently return the output as a List,
+ * for e.g. the ranking functions, lead/lag, ntile, cume_dist
+ * - for now continue to execute them here. The functions need to provide a way to get
+ * each output row as we are iterating through the input. This is relative
+ * easy to do for ranking functions; not possible for lead, ntile, cume_dist.
+ *
+ */
+ outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+ output.add(null);
+ } else {
+ output.add(null);
+ wFnsWithWindows.add(i);
+ }
+ i++;
+ }
+
+ i=0;
+ for(i=0; i < iPart.getOutputOI().getAllStructFieldRefs().size(); i++) {
+ output.add(null);
+ }
+
+ return new WindowingIterator(iPart, output, outputFromPivotFunctions,
+ ArrayUtils.toPrimitive(wFnsWithWindows.toArray(new Integer[wFnsWithWindows.size()])));
+ }
+
public static class WindowingTableFunctionResolver extends TableFunctionResolver
{
/*
@@ -193,27 +252,11 @@ public class WindowingTableFunction exte
Order order)
throws HiveException {
ArrayList<Object> vals = new ArrayList<Object>();
-
- GenericUDAFEvaluator fEval = wFnDef.getWFnEval();
-
- Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()];
for(int i=0; i < iPart.size(); i++) {
- AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
Range rng = getRange(wFnDef, i, iPart, order);
PTFPartitionIterator<Object> rItr = rng.iterator();
PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- while(rItr.hasNext()) {
- Object row = rItr.next();
- int j = 0;
- if ( wFnDef.getArgs() != null ) {
- for(PTFExpressionDef arg : wFnDef.getArgs()) {
- args[j++] = arg.getExprEvaluator().evaluate(row);
- }
- }
- fEval.aggregate(aggBuffer, args);
- }
- Object out = fEval.evaluate(aggBuffer);
- out = ObjectInspectorUtils.copyToStandardObject(out, wFnDef.getOI());
+ Object out = evaluateWindowFunction(wFnDef, rItr);
vals.add(out);
}
return vals;
@@ -792,4 +835,77 @@ public class WindowingTableFunction exte
}
+ public class WindowingIterator implements Iterator<Object> {
+
+ ArrayList<Object> output;
+ List<?>[] outputFromPivotFunctions;
+ int currIdx;
+ PTFPartition iPart;
+ /*
+ * these are the functions that have a Window.
+ * Fns w/o a Window have already been processed.
+ */
+ int[] wFnsToProcess;
+ WindowTableFunctionDef wTFnDef;
+ Order order;
+ PTFDesc ptfDesc;
+ StructObjectInspector inputOI;
+
+ WindowingIterator(PTFPartition iPart, ArrayList<Object> output,
+ List<?>[] outputFromPivotFunctions, int[] wFnsToProcess) {
+ this.iPart = iPart;
+ this.output = output;
+ this.outputFromPivotFunctions = outputFromPivotFunctions;
+ this.wFnsToProcess = wFnsToProcess;
+ this.currIdx = 0;
+ wTFnDef = (WindowTableFunctionDef) getTableDef();
+ order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
+ ptfDesc = getQueryDef();
+ inputOI = iPart.getOutputOI();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currIdx < iPart.size();
+ }
+
+ @Override
+ public Object next() {
+ int i;
+ for(i = 0; i < outputFromPivotFunctions.length; i++ ) {
+ if ( outputFromPivotFunctions[i] != null ) {
+ output.set(i, outputFromPivotFunctions[i].get(currIdx));
+ }
+ }
+
+ try {
+ for (int j : wFnsToProcess) {
+ WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
+ Range rng = getRange(wFn, currIdx, iPart, order);
+ PTFPartitionIterator<Object> rItr = rng.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+ output.set(j, evaluateWindowFunction(wFn, rItr));
+ }
+
+ Object iRow = iPart.getAt(currIdx);
+ i = wTFnDef.getWindowFunctions().size();
+ for (StructField f : inputOI.getAllStructFieldRefs()) {
+ output.set(i++, inputOI.getStructFieldData(iRow, f));
+ }
+
+ } catch (HiveException he) {
+ throw new RuntimeException(he);
+ }
+
+ currIdx++;
+ return output;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
}