You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/05/31 20:40:51 UTC

svn commit: r1598896 [1/2] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/udf/generic/ java/org/apache/hadoop/hive/ql/udf/ptf/ test/org/apache/hadoop/hive/ql/udaf/ test/results/clientpositive/

Author: hashutosh
Date: Sat May 31 18:40:50 2014
New Revision: 1598896

URL: http://svn.apache.org/r1598896
Log:
HIVE-7062 : Support Streaming mode in Windowing (Harish Butani via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
    hive/trunk/ql/src/test/results/clientpositive/ptf.q.out
    hive/trunk/ql/src/test/results/clientpositive/windowing.q.out
    hive/trunk/ql/src/test/results/clientpositive/windowing_windowspec.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Sat May 31 18:40:50 2014
@@ -83,6 +83,7 @@ public class PTFOperator extends Operato
 		setupKeysWrapper(inputObjInspectors[0]);
 		
 		ptfInvocation = setupChain();
+		ptfInvocation.initializeStreaming(jobConf, isMapOperator);
 		firstMapRow = true;
 
 		super.initializeOp(jobConf);
@@ -282,6 +283,19 @@ public class PTFOperator extends Operato
       return tabFn.canAcceptInputAsStream();
     }
     
+    void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException {
+      PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
+      PTFInputDef inputDef = tabDef.getInput();
+      ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? 
+          inputObjInspectors[0] : inputDef.getOutputShape().getOI();
+
+      tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide);
+
+      if ( next != null ) {
+        next.initializeStreaming(cfg, isMapSide);
+      }
+    }
+    
     void startPartition() throws HiveException {
       if ( isStreaming() ) {
         tabFn.startPartition();
@@ -301,15 +315,6 @@ public class PTFOperator extends Operato
     
     void processRow(Object row) throws HiveException {
       if ( isStreaming() ) {
-        if ( prev == null ) {
-          /*
-           * this is needed because during Translation we are still assuming that rows
-           * are collected into a PTFPartition.
-           * @Todo make translation handle the case when the first PTF is Streaming.
-           */
-          row = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0], 
-              ObjectInspectorCopyOption.WRITABLE);
-        }
         handleOutputRows(tabFn.processRow(row));
       } else {
         inputPart.append(row);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Sat May 31 18:40:50 2014
@@ -51,13 +51,25 @@ public class PTFPartition {
       SerDe serDe, StructObjectInspector inputOI,
       StructObjectInspector outputOI)
       throws HiveException {
+    this(cfg, serDe, inputOI, outputOI, true);
+  }
+  
+  protected PTFPartition(Configuration cfg,
+      SerDe serDe, StructObjectInspector inputOI,
+      StructObjectInspector outputOI,
+      boolean createElemContainer)
+      throws HiveException {
     this.serDe = serDe;
     this.inputOI = inputOI;
     this.outputOI = outputOI;
-    int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
-    elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
-    elems.setSerDe(serDe, outputOI);
-    elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
+    if ( createElemContainer ) {
+      int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+      elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
+      elems.setSerDe(serDe, outputOI);
+      elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
+    } else {
+      elems = null;
+    }
   }
 
   public void reset() throws HiveException {
@@ -233,6 +245,16 @@ public class PTFPartition {
       throws HiveException {
     return new PTFPartition(cfg, serDe, inputOI, outputOI);
   }
+  
+  public static PTFRollingPartition createRolling(Configuration cfg,
+      SerDe serDe,
+      StructObjectInspector inputOI,
+      StructObjectInspector outputOI,
+      int precedingSpan,
+      int followingSpan)
+      throws HiveException {
+    return new PTFRollingPartition(cfg, serDe, inputOI, outputOI, precedingSpan, followingSpan);
+  }
 
   public static StructObjectInspector setupPartitionOutputOI(SerDe serDe,
       StructObjectInspector tblFnOI) throws SerDeException {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java Sat May 31 18:40:50 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class PTFRollingPartition extends PTFPartition {
+
+  /*
+   * num rows whose output is evaluated.
+   */
+  int numRowsProcessed;
+
+  /*
+   * number rows to maintain before nextRowToProcess
+   */
+  int precedingSpan;
+
+  /*
+   * number rows to maintain after nextRowToProcess
+   */
+  int followingSpan;
+
+  /*
+   * number of rows received.
+   */
+  int numRowsReceived;
+  
+  /*
+   * State of the Rolling Partition
+   * 
+   * x0 x1 x2 x3 x4 x5 x6 x7 x8 x9 x10 x11 x12 x13 x14 x15 x16 x17
+   * ^                    ^                                     ^
+   * |                    |                                     |
+   * |--preceding span--numRowsProcessed---followingSpan --numRowsRecived
+   * 
+   * a. index x7 : represents the last output row
+   * b. so preceding span rows before that are still held on for subsequent rows processing.
+   * c. The #of rows beyond numRowsProcessed = followingSpan
+   */
+
+  /*
+   * cache of rows; guaranteed to contain precedingSpan rows before
+   * nextRowToProcess.
+   */
+  List<Object> currWindow;
+
+  protected PTFRollingPartition(Configuration cfg, SerDe serDe,
+      StructObjectInspector inputOI, StructObjectInspector outputOI,
+      int precedingSpan, int succeedingSpan) throws HiveException {
+    super(cfg, serDe, inputOI, outputOI, false);
+    this.precedingSpan = precedingSpan;
+    this.followingSpan = succeedingSpan;
+    currWindow = new ArrayList<Object>(precedingSpan + followingSpan);
+  }
+
+  public void reset() throws HiveException {
+    currWindow.clear();
+    numRowsProcessed = 0;
+    numRowsReceived = 0;
+  }
+
+  public Object getAt(int i) throws HiveException {
+    int rangeStart = numRowsReceived - currWindow.size();
+    return currWindow.get(i - rangeStart);
+  }
+
+  public void append(Object o) throws HiveException {
+    @SuppressWarnings("unchecked")
+    List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+        o, inputOI, ObjectInspectorCopyOption.WRITABLE);
+    currWindow.add(l);
+    numRowsReceived++;
+  }
+
+  public Object nextOutputRow() throws HiveException {
+    Object row = getAt(numRowsProcessed);
+    numRowsProcessed++;
+    if (numRowsProcessed > precedingSpan) {
+      currWindow.remove(0);
+    }
+    return row;
+  }
+
+  public boolean processedAllRows() {
+    return numRowsProcessed >= numRowsReceived;
+  }
+
+  public int rowToProcess(WindowFunctionDef wFn) {
+    int rowToProcess = numRowsReceived - wFn.getWindowFrame().getEnd().getAmt()
+        - 1;
+    return rowToProcess >= 0 ? rowToProcess : -1;
+  }
+
+  public int size() {
+    return numRowsReceived;
+  }
+
+  public PTFPartitionIterator<Object> iterator() throws HiveException {
+    return new RollingPItr();
+  }
+
+  public void close() {
+  }
+
+  class RollingPItr implements PTFPartitionIterator<Object> {
+
+    @Override
+    public boolean hasNext() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object next() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getIndex() {
+      return PTFRollingPartition.this.numRowsProcessed;
+    }
+
+    @Override
+    public Object lead(int amt) throws HiveException {
+      int i = PTFRollingPartition.this.numRowsProcessed + amt;
+      i = i >= PTFRollingPartition.this.numRowsReceived ? PTFRollingPartition.this.numRowsReceived - 1
+          : i;
+      return PTFRollingPartition.this.getAt(i);
+    }
+
+    @Override
+    public Object lag(int amt) throws HiveException {
+      int i = PTFRollingPartition.this.numRowsProcessed - amt;
+      int start = PTFRollingPartition.this.numRowsReceived
+          - PTFRollingPartition.this.currWindow.size();
+
+      i = i < start ? start : i;
+      return PTFRollingPartition.this.getAt(i);
+    }
+
+    @Override
+    public Object resetToIndex(int idx) throws HiveException {
+      return PTFRollingPartition.this.getAt(idx);
+    }
+
+    @Override
+    public PTFPartition getPartition() {
+      return PTFRollingPartition.this;
+    }
+
+    @Override
+    public void reset() throws HiveException {
+    }
+
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java Sat May 31 18:40:50 2014
@@ -26,7 +26,12 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.GenericUDAFSumDouble.SumDoubleAgg;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -152,6 +157,59 @@ public class GenericUDAFAverage extends 
       reset(result);
       return result;
     }
+
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+      BoundaryDef start = wFrmDef.getStart();
+      BoundaryDef end = wFrmDef.getEnd();
+
+      /*
+       * Currently we are not handling dynamic sized windows implied by range based windows.
+       */
+      if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+        return null;
+      }
+
+      /*
+       * Windows that are unbounded following don't benefit from Streaming.
+       */
+      if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return null;
+      }
+
+      return new GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>(this,
+          start.getAmt(), end.getAmt()) {
+
+        @Override
+        protected DoubleWritable getNextResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+            throws HiveException {
+          AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
+          Double r = myagg.count == 0 ? null : myagg.sum;
+          long cnt = myagg.count;
+          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+            Object[] o = ss.intermediateVals.remove(0);
+            Double d = o == null ? 0.0 : (Double) o[0];
+            r = r == null ? null : r - d;
+            cnt = cnt - ((Long) o[1]);
+          }
+
+          return r == null ? null : new DoubleWritable(r / cnt);
+        }
+
+        @Override
+        protected Object[] getCurrentIntermediateResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+            throws HiveException {
+          AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
+          return myagg.count == 0 ? null : new Object[] {
+              new Double(myagg.sum), myagg.count };
+        }
+
+      };
+    }
   }
 
   public static class GenericUDAFAverageEvaluatorDecimal extends AbstractGenericUDAFAverageEvaluator<HiveDecimal> {
@@ -241,6 +299,54 @@ public class GenericUDAFAverage extends 
       reset(result);
       return result;
     }
+
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+      BoundaryDef start = wFrmDef.getStart();
+      BoundaryDef end = wFrmDef.getEnd();
+
+      if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+        return null;
+      }
+
+      if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return null;
+      }
+
+      return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>(
+          this, start.getAmt(), end.getAmt()) {
+
+        @Override
+        protected HiveDecimalWritable getNextResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+            throws HiveException {
+          AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
+          HiveDecimal r = myagg.count == 0 ? null : myagg.sum;
+          long cnt = myagg.count;
+          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+            Object[] o = ss.intermediateVals.remove(0);
+            HiveDecimal d = o == null ? HiveDecimal.ZERO : (HiveDecimal) o[0];
+            r = r == null ? null : r.subtract(d);
+            cnt = cnt - ((Long) o[1]);
+          }
+
+          return r == null ? null : new HiveDecimalWritable(
+              r.divide(HiveDecimal.create(cnt)));
+        }
+
+        @Override
+        protected Object[] getCurrentIntermediateResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+            throws HiveException {
+          AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
+          return myagg.count == 0 ? null : new Object[] { myagg.sum,
+              myagg.count };
+        }
+
+      };
+    }
   }
 
   private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java Sat May 31 18:40:50 2014
@@ -53,12 +53,12 @@ public class GenericUDAFCumeDist extends
 	static final Log LOG = LogFactory.getLog(GenericUDAFCumeDist.class.getName());
 
 	@Override
-  protected GenericUDAFRankEvaluator createEvaluator()
+  protected GenericUDAFAbstractRankEvaluator createEvaluator()
 	{
 		return new GenericUDAFCumeDistEvaluator();
 	}
 
-  public static class GenericUDAFCumeDistEvaluator extends GenericUDAFRankEvaluator
+  public static class GenericUDAFCumeDistEvaluator extends GenericUDAFAbstractRankEvaluator
   {
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java Sat May 31 18:40:50 2014
@@ -43,7 +43,7 @@ public class GenericUDAFDenseRank extend
 	static final Log LOG = LogFactory.getLog(GenericUDAFDenseRank.class.getName());
 
 	@Override
-  protected GenericUDAFRankEvaluator createEvaluator()
+  protected GenericUDAFAbstractRankEvaluator createEvaluator()
 	{
 		return new GenericUDAFDenseRankEvaluator();
 	}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java Sat May 31 18:40:50 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.ql.udf.UDFType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
@@ -233,4 +234,24 @@ public abstract class GenericUDAFEvaluat
    */
   public abstract Object terminate(AggregationBuffer agg) throws HiveException;
 
+  /**
+   * When evaluating an aggregates over a fixed Window, the naive way to compute
+   * results is to compute the aggregate for each row. But often there is a way
+   * to compute results in a more efficient manner. This method enables the
+   * basic evaluator to provide a function object that does the job in a more
+   * efficient manner.
+   * <p>
+   * This method is called after this Evaluator is initialized. The returned
+   * Function must be initialized. It is passed the 'window' of aggregation for
+   * each row.
+   * 
+   * @param wFrmDef
+   *          the Window definition in play for this evaluation.
+   * @return null implies that this fn cannot be processed in Streaming mode. So
+   *         each row is evaluated independently.
+   */
+  public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+    return null;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java Sat May 31 18:40:50 2014
@@ -49,12 +49,12 @@ public class GenericUDAFPercentRank exte
 	static final Log LOG = LogFactory.getLog(GenericUDAFPercentRank.class.getName());
 
 	@Override
-  protected GenericUDAFRankEvaluator createEvaluator()
+  protected GenericUDAFAbstractRankEvaluator createEvaluator()
 	{
 		return new GenericUDAFPercentRankEvaluator();
 	}
 
-	public static class GenericUDAFPercentRankEvaluator extends GenericUDAFRankEvaluator
+	public static class GenericUDAFPercentRankEvaluator extends GenericUDAFAbstractRankEvaluator
 	{
 		@Override
 		public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java Sat May 31 18:40:50 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.UD
 import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -71,7 +72,7 @@ public class GenericUDAFRank extends Abs
 		return createEvaluator();
 	}
 
-	protected GenericUDAFRankEvaluator createEvaluator()
+	protected GenericUDAFAbstractRankEvaluator createEvaluator()
 	{
 		return new GenericUDAFRankEvaluator();
 	}
@@ -83,10 +84,12 @@ public class GenericUDAFRank extends Abs
 		Object[] currVal;
 		int currentRank;
 		int numParams;
+		boolean supportsStreaming;
 
-		RankBuffer(int numParams)
+		RankBuffer(int numParams, boolean supportsStreaming)
 		{
 			this.numParams = numParams;
+			this.supportsStreaming = supportsStreaming;
 			init();
 		}
 
@@ -96,20 +99,33 @@ public class GenericUDAFRank extends Abs
 			currentRowNum = 0;
 			currentRank = 0;
 			currVal = new Object[numParams];
+			if ( supportsStreaming ) {
+			  /* initialize rowNums to have 1 row */
+			  rowNums.add(null);
+			}
 		}
-
+		
 		void incrRowNum() { currentRowNum++; }
 
 		void addRank()
 		{
-			rowNums.add(new IntWritable(currentRank));
+		  if ( supportsStreaming ) {
+		    rowNums.set(0, new IntWritable(currentRank));
+		  } else {
+		    rowNums.add(new IntWritable(currentRank));
+		  }
 		}
 	}
 
-	public static class GenericUDAFRankEvaluator extends GenericUDAFEvaluator
+	public static abstract class GenericUDAFAbstractRankEvaluator extends GenericUDAFEvaluator 
 	{
 		ObjectInspector[] inputOI;
 		ObjectInspector[] outputOI;
+		boolean isStreamingMode = false;
+
+		protected boolean isStreaming() {
+		  return isStreamingMode;
+		}
 
 		@Override
 		public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException
@@ -132,7 +148,7 @@ public class GenericUDAFRank extends Abs
 		@Override
 		public AggregationBuffer getNewAggregationBuffer() throws HiveException
 		{
-			return new RankBuffer(inputOI.length);
+			return new RankBuffer(inputOI.length, isStreamingMode);
 		}
 
 		@Override
@@ -183,6 +199,23 @@ public class GenericUDAFRank extends Abs
 
 	}
 
+  public static class GenericUDAFRankEvaluator extends
+      GenericUDAFAbstractRankEvaluator implements
+      ISupportStreamingModeForWindowing {
+
+    @Override
+    public Object getNextResult(AggregationBuffer agg) throws HiveException {
+      return ((RankBuffer) agg).rowNums.get(0);
+    }
+
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+      isStreamingMode = true;
+      return this;
+    }
+
+  }
+
   public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2,
       ObjectInspector[] oi2)
   {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java Sat May 31 18:40:50 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+@SuppressWarnings({ "deprecation", "unchecked" })
+public abstract class GenericUDAFStreamingEnhancer<T1, T2> extends
+    GenericUDAFEvaluator implements ISupportStreamingModeForWindowing {
+
+  private final GenericUDAFEvaluator wrappedEval;
+  private final int numPreceding;
+  private final int numFollowing;
+
+  public GenericUDAFStreamingEnhancer(GenericUDAFEvaluator wrappedEval,
+      int numPreceding, int numFollowing) {
+    this.wrappedEval = wrappedEval;
+    this.numPreceding = numPreceding;
+    this.numFollowing = numFollowing;
+    this.mode = wrappedEval.mode;
+  }
+
+  class StreamingState extends AbstractAggregationBuffer {
+    final AggregationBuffer wrappedBuf;
+    final int numPreceding;
+    final int numFollowing;
+    final List<T1> results;
+    final List<T2> intermediateVals;
+    int numRows;
+
+    StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) {
+      this.wrappedBuf = buf;
+      this.numPreceding = numPreceding;
+      this.numFollowing = numFollowing;
+      results = new ArrayList<T1>();
+      intermediateVals = new ArrayList<T2>();
+      numRows = 0;
+    }
+
+    @Override
+    public int estimate() {
+      if (!(wrappedBuf instanceof AbstractAggregationBuffer)) {
+        return -1;
+      }
+      int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate();
+      if (underlying == -1) {
+        return -1;
+      }
+      if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return -1;
+      }
+      /*
+       * sz Estimate = sz needed by underlying AggBuffer +
+       *                 sz for results +
+       *                 sz for intermediates +
+       *                 3 * JavaDataModel.PRIMITIVES1
+       * sz of results = sz of underlying * wdwSz
+       * sz of intermediates = sz of underlying * wdwSz
+       */
+
+      int wdwSz = numPreceding + numFollowing + 1;
+      return underlying + 
+          (underlying * wdwSz) +
+          (underlying * wdwSz) +
+          (3 * JavaDataModel.PRIMITIVES1);
+    }
+  }
+
+  @Override
+  public ObjectInspector init(Mode m, ObjectInspector[] parameters)
+      throws HiveException {
+    throw new HiveException(getClass().getSimpleName() + ": init not supported");
+  }
+
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
+    return new StreamingState(numPreceding, numFollowing, underlying);
+  }
+
+  @Override
+  public void reset(AggregationBuffer agg) throws HiveException {
+    StreamingState ss = (StreamingState) agg;
+    wrappedEval.reset(ss.wrappedBuf);
+    ss.results.clear();
+    ss.intermediateVals.clear();
+    ss.numRows = 0;
+  }
+
+  @Override
+  public void iterate(AggregationBuffer agg, Object[] parameters)
+      throws HiveException {
+    StreamingState ss = (StreamingState) agg;
+
+    wrappedEval.iterate(ss.wrappedBuf, parameters);
+
+    if (ss.numRows >= ss.numFollowing) {
+      ss.results.add(getNextResult(ss));
+    }
+    if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) {
+      ss.intermediateVals.add(getCurrentIntermediateResult(ss));
+    }
+
+    ss.numRows++;
+  }
+
+  @Override
+  public Object terminate(AggregationBuffer agg) throws HiveException {
+    StreamingState ss = (StreamingState) agg;
+    Object o = wrappedEval.terminate(ss.wrappedBuf);
+
+    for (int i = 0; i < ss.numFollowing; i++) {
+      ss.results.add(getNextResult(ss));
+    }
+    return o;
+  }
+
+  @Override
+  public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+    throw new HiveException(getClass().getSimpleName()
+        + ": terminatePartial not supported");
+  }
+
+  @Override
+  public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+    throw new HiveException(getClass().getSimpleName()
+        + ": merge not supported");
+  }
+
+  @Override
+  public Object getNextResult(AggregationBuffer agg) throws HiveException {
+    StreamingState ss = (StreamingState) agg;
+    if (!ss.results.isEmpty()) {
+      T1 res = ss.results.remove(0);
+      if (res == null) {
+        return ISupportStreamingModeForWindowing.NULL_RESULT;
+      }
+      return res;
+    }
+    return null;
+  }
+
+  protected abstract T1 getNextResult(StreamingState ss) throws HiveException;
+
+  protected abstract T2 getCurrentIntermediateResult(StreamingState ss)
+      throws HiveException;
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java Sat May 31 18:40:50 2014
@@ -24,6 +24,10 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -179,6 +183,49 @@ public class GenericUDAFSum extends Abst
       return result;
     }
 
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+      BoundaryDef start = wFrmDef.getStart();
+      BoundaryDef end = wFrmDef.getEnd();
+
+      if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+        return null;
+      }
+
+      if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return null;
+      }
+
+      return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>(
+          this, start.getAmt(), end.getAmt()) {
+
+        @Override
+        protected HiveDecimalWritable getNextResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+            throws HiveException {
+          SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
+          HiveDecimal r = myagg.empty ? null : myagg.sum;
+          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+            HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0);
+            d = d == null ? HiveDecimal.ZERO : d;
+            r = r == null ? null : r.subtract(d);
+          }
+
+          return r == null ? null : new HiveDecimalWritable(r);
+        }
+
+        @Override
+        protected HiveDecimal getCurrentIntermediateResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+            throws HiveException {
+          SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
+          return myagg.empty ? null : myagg.sum;
+        }
+
+      };
+    }
   }
 
   /**
@@ -264,6 +311,50 @@ public class GenericUDAFSum extends Abst
       return result;
     }
 
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+      BoundaryDef start = wFrmDef.getStart();
+      BoundaryDef end = wFrmDef.getEnd();
+
+      if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+        return null;
+      }
+
+      if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return null;
+      }
+
+      return new GenericUDAFStreamingEnhancer<DoubleWritable, Double>(this,
+          start.getAmt(), end.getAmt()) {
+
+        @Override
+        protected DoubleWritable getNextResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+            throws HiveException {
+          SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
+          Double r = myagg.empty ? null : myagg.sum;
+          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+            Double d = (Double) ss.intermediateVals.remove(0);
+            d = d == null ? 0.0 : d;
+            r = r == null ? null : r - d;
+          }
+
+          return r == null ? null : new DoubleWritable(r);
+        }
+
+        @Override
+        protected Double getCurrentIntermediateResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+            throws HiveException {
+          SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
+          return myagg.empty ? null : new Double(myagg.sum);
+        }
+
+      };
+    }
+
   }
 
   /**
@@ -346,6 +437,49 @@ public class GenericUDAFSum extends Abst
       return result;
     }
 
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+      BoundaryDef start = wFrmDef.getStart();
+      BoundaryDef end = wFrmDef.getEnd();
+
+      if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+        return null;
+      }
+
+      if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+        return null;
+      }
+
+      return new GenericUDAFStreamingEnhancer<LongWritable, Long>(this,
+          start.getAmt(), end.getAmt()) {
+
+        @Override
+        protected LongWritable getNextResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+            throws HiveException {
+          SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
+          Long r = myagg.empty ? null : myagg.sum;
+          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+            Long d = (Long) ss.intermediateVals.remove(0);
+            d = d == null ? 0 : d;
+            r = r == null ? null : r - d;
+          }
+
+          return r == null ? null : new LongWritable(r);
+        }
+
+        @Override
+        protected Long getCurrentIntermediateResult(
+            org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+            throws HiveException {
+          SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
+          return myagg.empty ? null : new Long(myagg.sum);
+        }
+
+      };
+    }
   }
 
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java Sat May 31 18:40:50 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+/**
+ * A GenericUDAF mode that provides it results as a List to the
+ * {@link WindowingTableFunction} (so it is a
+ * {@link WindowFunctionInfo#isPivotResult()} return true) may support this
+ * interface. If it does then the WindowingTableFunction will ask it for the
+ * next Result after every aggregate call.
+ */
+public interface ISupportStreamingModeForWindowing {
+
+  Object getNextResult(AggregationBuffer agg) throws HiveException;
+
+  public static Object NULL_RESULT = new Object();
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java Sat May 31 18:40:50 2014
@@ -21,37 +21,47 @@ package org.apache.hadoop.hive.ql.udf.pt
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 public class NoopStreaming extends Noop {
-  
+
   List<Object> rows;
-  
+  StructObjectInspector inputOI;
+
   NoopStreaming() {
     rows = new ArrayList<Object>();
   }
-  
-  public boolean canAcceptInputAsStream() {
-    return true;
-  } 
-  
+
+  public void initializeStreaming(Configuration cfg,
+      StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+    this.inputOI = inputOI;
+    canAcceptInputAsStream = true;
+  }
+
   public List<Object> processRow(Object row) throws HiveException {
-    if (!canAcceptInputAsStream() ) {
+    if (!canAcceptInputAsStream()) {
       throw new HiveException(String.format(
-          "Internal error: PTF %s, doesn't support Streaming",
-          getClass().getName()));
+          "Internal error: PTF %s, doesn't support Streaming", getClass()
+              .getName()));
     }
     rows.clear();
+    row = ObjectInspectorUtils.copyToStandardObject(row, inputOI,
+        ObjectInspectorCopyOption.WRITABLE);
     rows.add(row);
     return rows;
   }
-  
+
   public static class NoopStreamingResolver extends NoopResolver {
 
     @Override
-    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) {
+    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+        PartitionedTableFunctionDef tDef) {
       return new NoopStreaming();
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java Sat May 31 18:40:50 2014
@@ -21,36 +21,46 @@ package org.apache.hadoop.hive.ql.udf.pt
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 
 public class NoopWithMapStreaming extends NoopWithMap {
   List<Object> rows;
-  
+  StructObjectInspector inputOI;
+
   NoopWithMapStreaming() {
     rows = new ArrayList<Object>();
   }
-  
-  public boolean canAcceptInputAsStream() {
-    return true;
-  } 
-  
+
+  public void initializeStreaming(Configuration cfg,
+      StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+    this.inputOI = inputOI;
+    canAcceptInputAsStream = true;
+  }
+
   public List<Object> processRow(Object row) throws HiveException {
-    if (!canAcceptInputAsStream() ) {
+    if (!canAcceptInputAsStream()) {
       throw new HiveException(String.format(
-          "Internal error: PTF %s, doesn't support Streaming",
-          getClass().getName()));
+          "Internal error: PTF %s, doesn't support Streaming", getClass()
+              .getName()));
     }
     rows.clear();
+    row = ObjectInspectorUtils.copyToStandardObject(row, inputOI,
+        ObjectInspectorCopyOption.WRITABLE);
     rows.add(row);
     return rows;
   }
-  
+
   public static class NoopWithMapStreamingResolver extends NoopWithMapResolver {
 
     @Override
-    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) {
+    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+        PartitionedTableFunctionDef tDef) {
       return new NoopStreaming();
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Sat May 31 18:40:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.udf.pt
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.PTFPartition;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /*
@@ -91,6 +93,7 @@ public abstract class TableFunctionEvalu
   protected PTFDesc ptfDesc;
   boolean transformsRawInput;
   transient protected PTFPartition outputPartition;
+  transient protected boolean canAcceptInputAsStream;
 
   static {
     PTFUtils.makeTransient(TableFunctionEvaluator.class, "outputOI", "rawInputOI");
@@ -215,9 +218,14 @@ public abstract class TableFunctionEvalu
    *   remaining o/p rows.
    */
   public boolean canAcceptInputAsStream() {
-    return false;
+    return canAcceptInputAsStream;
   }
-  
+
+  public void initializeStreaming(Configuration cfg,
+      StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+    canAcceptInputAsStream = false;
+  }
+
   public void startPartition() throws HiveException {
     if (!canAcceptInputAsStream() ) {
       throw new HiveException(String.format(

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Sat May 31 18:40:50 2014
@@ -25,9 +25,13 @@ import java.util.List;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.PTFPartition;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFRollingPartition;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -43,6 +47,9 @@ import org.apache.hadoop.hive.ql.plan.pt
 import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -52,6 +59,8 @@ import org.apache.hadoop.hive.serde2.obj
 @SuppressWarnings("deprecation")
 public class WindowingTableFunction extends TableFunctionEvaluator {
 
+  StreamingState streamingState;
+  
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
@@ -133,6 +142,251 @@ public class WindowingTableFunction exte
     return true;
   }
 
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#canAcceptInputAsStream
+   * ()
+   * 
+   * WindowTableFunction supports streaming if all functions meet one of these
+   * conditions: 1. The Function implements ISupportStreamingModeForWindowing 2.
+   * Or returns a non null Object for the getWindowingEvaluator, that implements
+   * ISupportStreamingModeForWindowing. 3. Is an invocation on a 'fixed' window.
+   * So no Unbounded Preceding or Following.
+   */
+  private int[] setCanAcceptInputAsStream(Configuration cfg) {
+
+    canAcceptInputAsStream = false;
+
+    if (ptfDesc.getLlInfo().getLeadLagExprs() != null) {
+      return null;
+    }
+
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+    int precedingSpan = 0;
+    int followingSpan = 0;
+
+    for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+      WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i);
+      WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
+      GenericUDAFEvaluator fnEval = wFnDef.getWFnEval();
+      GenericUDAFEvaluator streamingEval = fnEval
+          .getWindowingEvaluator(wdwFrame);
+      if (streamingEval != null
+          && streamingEval instanceof ISupportStreamingModeForWindowing) {
+        continue;
+      }
+      BoundaryDef start = wdwFrame.getStart();
+      BoundaryDef end = wdwFrame.getEnd();
+      if (!(end instanceof ValueBoundaryDef)
+          && !(start instanceof ValueBoundaryDef)) {
+        if (end.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT
+            && start.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT
+            && end.getDirection() != Direction.PRECEDING
+            && start.getDirection() != Direction.FOLLOWING) {
+
+          int amt = wdwFrame.getStart().getAmt();
+          if (amt > precedingSpan) {
+            precedingSpan = amt;
+          }
+
+          amt = wdwFrame.getEnd().getAmt();
+          if (amt > followingSpan) {
+            followingSpan = amt;
+          }
+          continue;
+        }
+      }
+      return null;
+    }
+    
+    int windowLimit = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+
+    if (windowLimit < (followingSpan + precedingSpan + 1)) {
+      return null;
+    }
+
+    canAcceptInputAsStream = true;
+    return new int[] {precedingSpan, followingSpan};
+  }
+
+  @Override
+  public void initializeStreaming(Configuration cfg,
+      StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+
+    int[] span = setCanAcceptInputAsStream(cfg);
+    if (!canAcceptInputAsStream) {
+      return;
+    }
+
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+
+    for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+      WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i);
+      WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
+      GenericUDAFEvaluator fnEval = wFnDef.getWFnEval();
+      GenericUDAFEvaluator streamingEval = fnEval
+          .getWindowingEvaluator(wdwFrame);
+      if (streamingEval != null) {
+        wFnDef.setWFnEval(streamingEval);
+        if (wFnDef.isPivotResult()) {
+          ListObjectInspector listOI = (ListObjectInspector) wFnDef.getOI();
+          wFnDef.setOI(listOI.getListElementObjectInspector());
+        }
+      }
+    }
+    streamingState = new StreamingState(cfg, inputOI, isMapSide, tabDef,
+        span[0], span[1]);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#startPartition()
+   */
+  @Override
+  public void startPartition() throws HiveException {
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+    streamingState.reset(tabDef);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#processRow(java
+   * .lang.Object)
+   * 
+   * - hand row to each Function, provided there are enough rows for Function's
+   * window. - call getNextObject on each Function. - output as many rows as
+   * possible, based on minimum sz of Output List
+   */
+  @Override
+  public List<Object> processRow(Object row) throws HiveException {
+
+    streamingState.rollingPart.append(row);
+    row = streamingState.rollingPart
+        .getAt(streamingState.rollingPart.size() - 1);
+
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+
+    for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+      WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+      GenericUDAFEvaluator fnEval = wFn.getWFnEval();
+
+      int a = 0;
+      if (wFn.getArgs() != null) {
+        for (PTFExpressionDef arg : wFn.getArgs()) {
+          streamingState.funcArgs[i][a++] = arg.getExprEvaluator().evaluate(row);
+        }
+      }
+
+      if (fnEval instanceof ISupportStreamingModeForWindowing) {
+        fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]);
+        Object out = ((ISupportStreamingModeForWindowing) fnEval)
+            .getNextResult(streamingState.aggBuffers[i]);
+        if (out != null) {
+          streamingState.fnOutputs[i]
+              .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
+                  : out);
+        }
+      } else {
+        int rowToProcess = streamingState.rollingPart.rowToProcess(wFn);
+        if (rowToProcess >= 0) {
+          Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+              streamingState.order);
+          PTFPartitionIterator<Object> rItr = rng.iterator();
+          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+          Object out = evaluateWindowFunction(wFn, rItr);
+          streamingState.fnOutputs[i].add(out);
+        }
+      }
+    }
+
+    List<Object> oRows = new ArrayList<Object>();
+    while (true) {
+      boolean hasRow = streamingState.hasOutputRow();
+
+      if (!hasRow) {
+        break;
+      }
+
+      oRows.add(streamingState.nextOutputRow());
+    }
+
+    return oRows.size() == 0 ? null : oRows;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#finishPartition()
+   * 
+   * for fns that are not ISupportStreamingModeForWindowing give them the
+   * remaining rows (rows whose span went beyond the end of the partition) for
+   * rest of the functions invoke terminate.
+   * 
+   * while numOutputRows < numInputRows for each Fn that doesn't have enough o/p
+   * invoke getNextObj if there is no O/p then flag this as an error.
+   */
+  @Override
+  public List<Object> finishPartition() throws HiveException {
+
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+    for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+      WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+      GenericUDAFEvaluator fnEval = wFn.getWFnEval();
+
+      int numRowsRemaining = wFn.getWindowFrame().getEnd().getAmt();
+      if (fnEval instanceof ISupportStreamingModeForWindowing) {
+        fnEval.terminate(streamingState.aggBuffers[i]);
+        if (numRowsRemaining != BoundarySpec.UNBOUNDED_AMOUNT) {
+          while (numRowsRemaining > 0) {
+            Object out = ((ISupportStreamingModeForWindowing) fnEval)
+                .getNextResult(streamingState.aggBuffers[i]);
+            if (out != null) {
+              streamingState.fnOutputs[i]
+                  .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
+                      : out);
+            }
+            numRowsRemaining--;
+          }
+        }
+      } else {
+        while (numRowsRemaining > 0) {
+          int rowToProcess = streamingState.rollingPart.size()
+              - numRowsRemaining;
+          Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+              streamingState.order);
+          PTFPartitionIterator<Object> rItr = rng.iterator();
+          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+          Object out = evaluateWindowFunction(wFn, rItr);
+          streamingState.fnOutputs[i].add(out);
+          numRowsRemaining--;
+        }
+      }
+
+    }
+
+    List<Object> oRows = new ArrayList<Object>();
+
+    while (!streamingState.rollingPart.processedAllRows()) {
+      boolean hasRow = streamingState.hasOutputRow();
+      ;
+
+      if (!hasRow) {
+        throw new HiveException(
+            "Internal Error: cannot generate all output rows for a Partition");
+      }
+      oRows.add(streamingState.nextOutputRow());
+    }
+
+    return oRows.size() == 0 ? null : oRows;
+  }
+
   @Override
   public boolean canIterateOutput() {
     return true;
@@ -155,16 +409,19 @@ public class WindowingTableFunction exte
         Object out = evaluateWindowFunction(wFn, pItr);
         output.add(out);
       } else if (wFn.isPivotResult()) {
-        /*
-         * for functions that currently return the output as a List,
-         * for e.g. the ranking functions, lead/lag, ntile, cume_dist
-         * - for now continue to execute them here. The functions need to provide a way to get
-         *   each output row as we are iterating through the input. This is relative
-         *   easy to do for ranking functions; not possible for lead, ntile, cume_dist.
-         *
-         */
-        outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
-        output.add(null);
+        GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame());
+        if ( streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing ) {
+          wFn.setWFnEval(streamingEval);
+          if ( wFn.getOI() instanceof ListObjectInspector ) {
+            ListObjectInspector listOI = (ListObjectInspector) wFn.getOI();
+            wFn.setOI(listOI.getListElementObjectInspector());
+          }
+          output.add(null);
+          wFnsWithWindows.add(i);  
+        } else {
+          outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+          output.add(null);
+        }
       } else {
         output.add(null);
         wFnsWithWindows.add(i);
@@ -884,8 +1141,10 @@ public class WindowingTableFunction exte
     Order order;
     PTFDesc ptfDesc;
     StructObjectInspector inputOI;
+    AggregationBuffer[] aggBuffers;
+    Object[][] args;
 
-    WindowingIterator(PTFPartition iPart, ArrayList<Object>  output,
+    WindowingIterator(PTFPartition iPart, ArrayList<Object> output,
         List<?>[] outputFromPivotFunctions, int[] wFnsToProcess) {
       this.iPart = iPart;
       this.output = output;
@@ -896,6 +1155,18 @@ public class WindowingTableFunction exte
       order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
       ptfDesc = getQueryDef();
       inputOI = iPart.getOutputOI();
+
+      aggBuffers = new AggregationBuffer[wTFnDef.getWindowFunctions().size()];
+      args = new Object[wTFnDef.getWindowFunctions().size()][];
+      try {
+        for (int j : wFnsToProcess) {
+          WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
+          aggBuffers[j] = wFn.getWFnEval().getNewAggregationBuffer();
+          args[j] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+        }
+      } catch (HiveException he) {
+        throw new RuntimeException(he);
+      }
     }
 
     @Override
@@ -915,10 +1186,25 @@ public class WindowingTableFunction exte
       try {
         for (int j : wFnsToProcess) {
           WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
-          Range rng = getRange(wFn, currIdx, iPart, order);
-          PTFPartitionIterator<Object> rItr = rng.iterator();
-          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-          output.set(j, evaluateWindowFunction(wFn, rItr));
+          if (wFn.getWFnEval() instanceof ISupportStreamingModeForWindowing) {
+            Object iRow = iPart.getAt(currIdx);
+            int a = 0;
+            if (wFn.getArgs() != null) {
+              for (PTFExpressionDef arg : wFn.getArgs()) {
+                args[j][a++] = arg.getExprEvaluator().evaluate(iRow);
+              }
+            }
+            wFn.getWFnEval().aggregate(aggBuffers[j], args[j]);
+            Object out = ((ISupportStreamingModeForWindowing) wFn.getWFnEval())
+                .getNextResult(aggBuffers[j]);
+            out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
+            output.set(j, out);
+          } else {
+            Range rng = getRange(wFn, currIdx, iPart, order);
+            PTFPartitionIterator<Object> rItr = rng.iterator();
+            PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+            output.set(j, evaluateWindowFunction(wFn, rItr));
+          }
         }
 
         Object iRow = iPart.getAt(currIdx);
@@ -942,4 +1228,72 @@ public class WindowingTableFunction exte
 
   }
 
+  class StreamingState {
+    PTFRollingPartition rollingPart;
+    List<Object>[] fnOutputs;
+    AggregationBuffer[] aggBuffers;
+    Object[][] funcArgs;
+    Order order;
+
+    @SuppressWarnings("unchecked")
+    StreamingState(Configuration cfg, StructObjectInspector inputOI,
+        boolean isMapSide, WindowTableFunctionDef tabDef, int precedingSpan,
+        int followingSpan) throws HiveException {
+      SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde()
+          : tabDef.getRawInputShape().getSerde();
+      StructObjectInspector outputOI = isMapSide ? tabDef.getInput()
+          .getOutputShape().getOI() : tabDef.getRawInputShape().getOI();
+      rollingPart = PTFPartition.createRolling(cfg, serde, inputOI, outputOI,
+          precedingSpan, followingSpan);
+
+      order = tabDef.getOrder().getExpressions().get(0).getOrder();
+
+      int numFns = tabDef.getWindowFunctions().size();
+      fnOutputs = new ArrayList[numFns];
+
+      aggBuffers = new AggregationBuffer[numFns];
+      funcArgs = new Object[numFns][];
+      for (int i = 0; i < numFns; i++) {
+        fnOutputs[i] = new ArrayList<Object>();
+        WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+        funcArgs[i] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+      }
+    }
+
+    void reset(WindowTableFunctionDef tabDef) throws HiveException {
+      int numFns = tabDef.getWindowFunctions().size();
+      rollingPart.reset();
+      for (int i = 0; i < fnOutputs.length; i++) {
+        fnOutputs[i].clear();
+      }
+
+      for (int i = 0; i < numFns; i++) {
+        WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+        aggBuffers[i] = wFn.getWFnEval().getNewAggregationBuffer();
+      }
+    }
+
+    boolean hasOutputRow() {
+      for (int i = 0; i < fnOutputs.length; i++) {
+        if (fnOutputs[i].size() == 0) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    List<Object> nextOutputRow() throws HiveException {
+      List<Object> oRow = new ArrayList<Object>();
+      Object iRow = rollingPart.nextOutputRow();
+      int i = 0;
+      for (; i < fnOutputs.length; i++) {
+        oRow.add(fnOutputs[i].remove(0));
+      }
+      for (StructField f : rollingPart.getOutputOI().getAllStructFieldRefs()) {
+        oRow.add(rollingPart.getOutputOI().getStructFieldData(iRow, f));
+      }
+      return oRow;
+    }
+  }
+
 }

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java Sat May 31 18:40:50 2014
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udaf;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.Test;
+
+public class TestStreamingAvg {
+
+  public void avgDouble(Iterator<Double> inVals, int inSz, int numPreceding,
+      int numFollowing, Iterator<Double> outVals) throws HiveException {
+
+    GenericUDAFAverage fnR = new GenericUDAFAverage();
+    TypeInfo[] inputTypes = { TypeInfoFactory.doubleTypeInfo };
+    ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableDoubleObjectInspector };
+
+    DoubleWritable[] in = new DoubleWritable[1];
+    in[0] = new DoubleWritable();
+
+    TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.DoubleHandler,
+        in, inputOIs, inSz, numPreceding, numFollowing, outVals);
+
+  }
+
+  public void avgHiveDecimal(Iterator<HiveDecimal> inVals, int inSz,
+      int numPreceding, int numFollowing, Iterator<HiveDecimal> outVals)
+      throws HiveException {
+
+    GenericUDAFAverage fnR = new GenericUDAFAverage();
+    TypeInfo[] inputTypes = { TypeInfoFactory.decimalTypeInfo };
+    ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector };
+
+    HiveDecimalWritable[] in = new HiveDecimalWritable[1];
+    in[0] = new HiveDecimalWritable();
+
+    TestStreamingSum._agg(fnR, inputTypes, inVals,
+        TypeHandler.HiveDecimalHandler, in, inputOIs, inSz, numPreceding,
+        numFollowing, outVals);
+
+  }
+
+  @Test
+  public void testDouble_3_4() throws HiveException {
+
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(15.0 / 5, 21.0 / 6, 28.0 / 7,
+        36.0 / 8, 44.0 / 8, 52.0 / 8, 49.0 / 7, 45.0 / 6, 40.0 / 5, 34.0 / 4);
+    avgDouble(inVals.iterator(), 10, 3, 4, outVals.iterator());
+  }
+
+  @Test
+  public void testHiveDecimal_3_4() throws HiveException {
+
+    List<HiveDecimal> inVals = Arrays
+        .asList(HiveDecimal.create(1L), HiveDecimal.create(2L),
+            HiveDecimal.create(3L), HiveDecimal.create(4L),
+            HiveDecimal.create(5L), HiveDecimal.create(6L),
+            HiveDecimal.create(7L), HiveDecimal.create(8L),
+            HiveDecimal.create(9L), HiveDecimal.create(10L));
+    List<HiveDecimal> outVals = Arrays.asList(
+        HiveDecimal.create(new BigDecimal(15.0 / 5)),
+        HiveDecimal.create(new BigDecimal(21.0 / 6)),
+        HiveDecimal.create(new BigDecimal(28.0 / 7)),
+        HiveDecimal.create(new BigDecimal(36.0 / 8)),
+        HiveDecimal.create(new BigDecimal(44.0 / 8)),
+        HiveDecimal.create(new BigDecimal(52.0 / 8)),
+        HiveDecimal.create(new BigDecimal(49.0 / 7)),
+        HiveDecimal.create(new BigDecimal(45.0 / 6)),
+        HiveDecimal.create(new BigDecimal(40.0 / 5)),
+        HiveDecimal.create(new BigDecimal(34.0 / 4)));
+    avgHiveDecimal(inVals.iterator(), 10, 3, 4, outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_3_0() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4,
+        14.0 / 4, 18.0 / 4, 22.0 / 4, 26.0 / 4, 30.0 / 4, 34.0 / 4);
+    avgDouble(inVals.iterator(), 10, 3, 0, outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_unb_0() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4,
+        15.0 / 5, 21.0 / 6, 28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10);
+    avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0,
+        outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_0_5() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(21.0 / 6, 27.0 / 6, 33.0 / 6,
+        39.0 / 6, 45.0 / 6, 40.0 / 5, 34.0 / 4, 27.0 / 3, 19.0 / 2, 10.0 / 1);
+    avgDouble(inVals.iterator(), 10, 0, 5, outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_unb_5() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(21.0 / 6, 28.0 / 7, 36.0 / 8,
+        45.0 / 9, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10,
+        55.0 / 10);
+    avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5,
+        outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_7_2() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(6.0 / 3, 10.0 / 4, 15.0 / 5, 21.0 / 6,
+        28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10, 54.0 / 9, 52.0 / 8);
+    avgDouble(inVals.iterator(), 10, 7, 2, outVals.iterator());
+  }
+
+  @Test
+  public void testDouble_15_15() throws HiveException {
+    List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+        9.0, 10.0);
+    List<Double> outVals = Arrays.asList(55.0 / 10, 55.0 / 10, 55.0 / 10,
+        55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10,
+        55.0 / 10);
+    avgDouble(inVals.iterator(), 10, 15, 15, outVals.iterator());
+  }
+}