You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/04/29 16:54:17 UTC

svn commit: r1590999 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/PTFOperator.java parse/PTFTranslator.java plan/PTFDeserializer.java udf/ptf/TableFunctionEvaluator.java udf/ptf/WindowingTableFunction.java

Author: hashutosh
Date: Tue Apr 29 14:54:16 2014
New Revision: 1590999

URL: http://svn.apache.org/r1590999
Log:
HIVE-4965 : Add support so that PTFs can stream their output; Windowing PTF should do this (Harish Butani via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Apr 29 14:54:16 2014
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Stack;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -179,8 +181,8 @@ public class PTFOperator extends Operato
 	}
 
 	protected void processInputPartition() throws HiveException {
-	  PTFPartition outPart = executeChain(inputPart);
-	  PTFPartitionIterator<Object> pItr = outPart.iterator();
+    Iterator<Object> pItr = executeChain(inputPart);
+
     while (pItr.hasNext()) {
       Object oRow = pItr.next();
       forward(oRow, outputObjInspector);
@@ -189,8 +191,11 @@ public class PTFOperator extends Operato
 
 	protected void processMapFunction() throws HiveException {
 	  PartitionedTableFunctionDef tDef = conf.getStartOfChain();
-    PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
-    PTFPartitionIterator<Object> pItr = outPart.iterator();
+    
+    Iterator<Object> pItr = tDef.getTFunction().canIterateOutput() ? 
+        tDef.getTFunction().transformRawInputIterator(inputPart.iterator()) :
+          tDef.getTFunction().transformRawInput(inputPart).iterator();
+    
     while (pItr.hasNext()) {
       Object oRow = pItr.next();
       forward(oRow, outputObjInspector);
@@ -225,9 +230,9 @@ public class PTFOperator extends Operato
    * @return
    * @throws HiveException
    */
-  private PTFPartition executeChain(PTFPartition part)
+  private Iterator<Object> executeChain(PTFPartition part)
       throws HiveException {
-    Deque<PartitionedTableFunctionDef> fnDefs = new ArrayDeque<PartitionedTableFunctionDef>();
+    Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
     PTFInputDef iDef = conf.getFuncDef();
 
     while (iDef instanceof PartitionedTableFunctionDef) {
@@ -236,11 +241,21 @@ public class PTFOperator extends Operato
     }
 
     PartitionedTableFunctionDef currFnDef;
-    while (!fnDefs.isEmpty()) {
+    int i = fnDefs.size();
+    while (i > 1) {
       currFnDef = fnDefs.pop();
       part = currFnDef.getTFunction().execute(part);
+      i--;
     }
-    return part;
+
+    currFnDef = fnDefs.pop();
+    if (!currFnDef.getTFunction().canIterateOutput()) {
+      part = currFnDef.getTFunction().execute(part);
+      return part.iterator();
+    } else {
+      return currFnDef.getTFunction().iterator(part.iterator());
+    }
+
   }
 
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Tue Apr 29 14:54:16 2014
@@ -137,6 +137,7 @@ public class PTFTranslator {
     ptfDesc.setCfg(hCfg);
     ptfDesc.setLlInfo(llInfo);
     translatePTFChain();
+    PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
     return ptfDesc;
   }
 
@@ -222,6 +223,8 @@ public class PTFTranslator {
 
     tFn.setupOutputOI();
 
+    PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
+
     return ptfDesc;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Tue Apr 29 14:54:16 2014
@@ -95,6 +95,8 @@ public class PTFDeserializer {
         initialize((PartitionedTableFunctionDef) currentDef);
       }
     }
+
+    PTFDeserializer.alterOutputOIForStreaming(ptfDesc);
   }
 
   public void initializeWindowing(WindowTableFunctionDef def) throws HiveException {
@@ -331,4 +333,17 @@ public class PTFDeserializer {
     {fnames, fields};
   }
 
+  /*
+   * If the final PTF in a PTFChain can stream its output, then set the OI of its OutputShape
+   * to the OI returned by the TableFunctionEvaluator.
+   */
+  public static void alterOutputOIForStreaming(PTFDesc ptfDesc) {
+    PartitionedTableFunctionDef tDef = ptfDesc.getFuncDef();
+    TableFunctionEvaluator tEval = tDef.getTFunction();
+
+    if ( tEval.canIterateOutput() ) {
+      tDef.getOutputShape().setOI(tEval.getOutputOI());
+    }
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Tue Apr 29 14:54:16 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.udf.ptf;
 
+import java.util.Iterator;
+
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.PTFPartition;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -138,6 +140,37 @@ public abstract class TableFunctionEvalu
     return null;
   }
 
+
+  /*
+   * A TableFunction may be able to provide its Output as an Iterator.
+   * In case it can then for Map-side processing and for the last PTF in a Reduce-side chain
+   * we can forward rows one by one. This will save the time/space to populate and read an Output
+   * Partition.
+   */
+  public boolean canIterateOutput() {
+    return false;
+  }
+
+  public Iterator<Object> iterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+    if (!canIterateOutput()) {
+      throw new HiveException(
+          "Internal error: iterator called on a PTF that cannot provide its output as an Iterator");
+    }
+    throw new HiveException(String.format(
+        "Internal error: PTF %s, provides no iterator method",
+        getClass().getName()));
+  }
+  
+  public Iterator<Object> transformRawInputIterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+    if (!canIterateOutput()) {
+      throw new HiveException(
+          "Internal error: iterator called on a PTF that cannot provide its output as an Iterator");
+    }
+    throw new HiveException(String.format(
+        "Internal error: PTF %s, provides no iterator method",
+        getClass().getName()));
+  }
+
   public void close() {
     if (outputPartition != null) {
       outputPartition.close();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1590999&r1=1590998&r2=1590999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Tue Apr 29 14:54:16 2014
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.udf.pt
 
 import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.PTFPartition;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -64,20 +66,7 @@ public class WindowingTableFunction exte
       boolean processWindow = processWindow(wFn);
       pItr.reset();
       if ( !processWindow ) {
-        GenericUDAFEvaluator fEval = wFn.getWFnEval();
-        Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
-        AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
-        while(pItr.hasNext()) {
-          Object row = pItr.next();
-          int i =0;
-          if ( wFn.getArgs() != null ) {
-            for(PTFExpressionDef arg : wFn.getArgs()) {
-              args[i++] = arg.getExprEvaluator().evaluate(row);
-            }
-          }
-          fEval.aggregate(aggBuffer, args);
-        }
-        Object out = fEval.evaluate(aggBuffer);
+        Object out = evaluateWindowFunction(wFn, pItr);
         if ( !wFn.isPivotResult()) {
           out = new SameList(iPart.size(), out);
         }
@@ -109,6 +98,28 @@ public class WindowingTableFunction exte
     }
   }
 
+  Object evaluateWindowFunction(WindowFunctionDef wFn,
+      PTFPartitionIterator<Object> pItr) throws HiveException {
+    GenericUDAFEvaluator fEval = wFn.getWFnEval();
+    Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+    AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
+    while(pItr.hasNext())
+    {
+      Object row = pItr.next();
+      int i =0;
+      if ( wFn.getArgs() != null ) {
+        for(PTFExpressionDef arg : wFn.getArgs())
+        {
+          args[i++] = arg.getExprEvaluator().evaluate(row);
+        }
+      }
+      fEval.aggregate(aggBuffer, args);
+    }
+    Object out = fEval.evaluate(aggBuffer);
+    out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
+    return out;
+  }
+
   private boolean processWindow(WindowFunctionDef wFn) {
     WindowFrameDef frame = wFn.getWindowFrame();
     if ( frame == null ) {
@@ -121,6 +132,54 @@ public class WindowingTableFunction exte
     return true;
   }
 
+  @Override
+  public boolean canIterateOutput() {
+    return true;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Iterator<Object> iterator(PTFPartitionIterator<Object> pItr) throws HiveException {
+    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
+    ArrayList<Object> output = new ArrayList<Object>();
+    List<?>[] outputFromPivotFunctions = new List<?>[wTFnDef.getWindowFunctions().size()];
+    ArrayList<Integer> wFnsWithWindows = new ArrayList<Integer>();
+    PTFPartition iPart = pItr.getPartition();
+
+    int i=0;
+    for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
+      boolean processWindow = processWindow(wFn);
+      pItr.reset();
+      if ( !processWindow && !wFn.isPivotResult() ) {
+        Object out = evaluateWindowFunction(wFn, pItr);
+        output.add(out);
+      } else if (wFn.isPivotResult()) {
+        /*
+         * for functions that currently return the output as a List,
+         * for e.g. the ranking functions, lead/lag, ntile, cume_dist
+         * - for now continue to execute them here. The functions need to provide a way to get
+         *   each output row as we are iterating through the input. This is relative
+         *   easy to do for ranking functions; not possible for lead, ntile, cume_dist.
+         *
+         */
+        outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+        output.add(null);
+      } else {
+        output.add(null);
+        wFnsWithWindows.add(i);
+      }
+      i++;
+    }
+
+    i=0;
+    for(i=0; i < iPart.getOutputOI().getAllStructFieldRefs().size(); i++) {
+      output.add(null);
+    }
+
+    return new WindowingIterator(iPart, output, outputFromPivotFunctions,
+        ArrayUtils.toPrimitive(wFnsWithWindows.toArray(new Integer[wFnsWithWindows.size()])));
+  }
+
   public static class WindowingTableFunctionResolver extends TableFunctionResolver
   {
     /*
@@ -193,27 +252,11 @@ public class WindowingTableFunction exte
       Order order)
     throws HiveException {
     ArrayList<Object> vals = new ArrayList<Object>();
-
-    GenericUDAFEvaluator fEval = wFnDef.getWFnEval();
-
-    Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()];
     for(int i=0; i < iPart.size(); i++) {
-      AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
       Range rng = getRange(wFnDef, i, iPart, order);
       PTFPartitionIterator<Object> rItr = rng.iterator();
       PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-      while(rItr.hasNext()) {
-        Object row = rItr.next();
-        int j = 0;
-        if ( wFnDef.getArgs() != null ) {
-          for(PTFExpressionDef arg : wFnDef.getArgs()) {
-            args[j++] = arg.getExprEvaluator().evaluate(row);
-          }
-        }
-        fEval.aggregate(aggBuffer, args);
-      }
-      Object out = fEval.evaluate(aggBuffer);
-      out = ObjectInspectorUtils.copyToStandardObject(out, wFnDef.getOI());
+      Object out = evaluateWindowFunction(wFnDef, rItr);
       vals.add(out);
     }
     return vals;
@@ -792,4 +835,77 @@ public class WindowingTableFunction exte
 
   }
 
+  public class WindowingIterator implements Iterator<Object> {
+
+    ArrayList<Object> output;
+    List<?>[] outputFromPivotFunctions;
+    int currIdx;
+    PTFPartition iPart;
+    /*
+     * these are the functions that have a Window.
+     * Fns w/o a Window have already been processed.
+     */
+    int[] wFnsToProcess;
+    WindowTableFunctionDef wTFnDef;
+    Order order;
+    PTFDesc ptfDesc;
+    StructObjectInspector inputOI;
+
+    WindowingIterator(PTFPartition iPart, ArrayList<Object>  output,
+        List<?>[] outputFromPivotFunctions, int[] wFnsToProcess) {
+      this.iPart = iPart;
+      this.output = output;
+      this.outputFromPivotFunctions = outputFromPivotFunctions;
+      this.wFnsToProcess = wFnsToProcess;
+      this.currIdx = 0;
+      wTFnDef = (WindowTableFunctionDef) getTableDef();
+      order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
+      ptfDesc = getQueryDef();
+      inputOI = iPart.getOutputOI();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currIdx < iPart.size();
+    }
+
+    @Override
+    public Object next() {
+      int i;
+      for(i = 0; i < outputFromPivotFunctions.length; i++ ) {
+        if ( outputFromPivotFunctions[i] != null ) {
+          output.set(i, outputFromPivotFunctions[i].get(currIdx));
+        }
+      }
+
+      try {
+        for (int j : wFnsToProcess) {
+          WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
+          Range rng = getRange(wFn, currIdx, iPart, order);
+          PTFPartitionIterator<Object> rItr = rng.iterator();
+          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+          output.set(j, evaluateWindowFunction(wFn, rItr));
+        }
+
+        Object iRow = iPart.getAt(currIdx);
+        i = wTFnDef.getWindowFunctions().size();
+        for (StructField f : inputOI.getAllStructFieldRefs()) {
+          output.set(i++, inputOI.getStructFieldData(iRow, f));
+        }
+
+      } catch (HiveException he) {
+        throw new RuntimeException(he);
+      }
+
+      currIdx++;
+      return output;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
 }