You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/05/31 20:40:51 UTC
svn commit: r1598896 [1/2] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/udf/generic/
java/org/apache/hadoop/hive/ql/udf/ptf/
test/org/apache/hadoop/hive/ql/udaf/ test/results/clientpositive/
Author: hashutosh
Date: Sat May 31 18:40:50 2014
New Revision: 1598896
URL: http://svn.apache.org/r1598896
Log:
HIVE-7062 : Support Streaming mode in Windowing (Harish Butani via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
hive/trunk/ql/src/test/results/clientpositive/ptf.q.out
hive/trunk/ql/src/test/results/clientpositive/windowing.q.out
hive/trunk/ql/src/test/results/clientpositive/windowing_windowspec.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Sat May 31 18:40:50 2014
@@ -83,6 +83,7 @@ public class PTFOperator extends Operato
setupKeysWrapper(inputObjInspectors[0]);
ptfInvocation = setupChain();
+ ptfInvocation.initializeStreaming(jobConf, isMapOperator);
firstMapRow = true;
super.initializeOp(jobConf);
@@ -282,6 +283,19 @@ public class PTFOperator extends Operato
return tabFn.canAcceptInputAsStream();
}
+ void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException {
+ PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
+ PTFInputDef inputDef = tabDef.getInput();
+ ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
+ inputObjInspectors[0] : inputDef.getOutputShape().getOI();
+
+ tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide);
+
+ if ( next != null ) {
+ next.initializeStreaming(cfg, isMapSide);
+ }
+ }
+
void startPartition() throws HiveException {
if ( isStreaming() ) {
tabFn.startPartition();
@@ -301,15 +315,6 @@ public class PTFOperator extends Operato
void processRow(Object row) throws HiveException {
if ( isStreaming() ) {
- if ( prev == null ) {
- /*
- * this is needed because during Translation we are still assuming that rows
- * are collected into a PTFPartition.
- * @Todo make translation handle the case when the first PTF is Streaming.
- */
- row = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0],
- ObjectInspectorCopyOption.WRITABLE);
- }
handleOutputRows(tabFn.processRow(row));
} else {
inputPart.append(row);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Sat May 31 18:40:50 2014
@@ -51,13 +51,25 @@ public class PTFPartition {
SerDe serDe, StructObjectInspector inputOI,
StructObjectInspector outputOI)
throws HiveException {
+ this(cfg, serDe, inputOI, outputOI, true);
+ }
+
+ protected PTFPartition(Configuration cfg,
+ SerDe serDe, StructObjectInspector inputOI,
+ StructObjectInspector outputOI,
+ boolean createElemContainer)
+ throws HiveException {
this.serDe = serDe;
this.inputOI = inputOI;
this.outputOI = outputOI;
- int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
- elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
- elems.setSerDe(serDe, outputOI);
- elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
+ if ( createElemContainer ) {
+ int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+ elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
+ elems.setSerDe(serDe, outputOI);
+ elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
+ } else {
+ elems = null;
+ }
}
public void reset() throws HiveException {
@@ -233,6 +245,16 @@ public class PTFPartition {
throws HiveException {
return new PTFPartition(cfg, serDe, inputOI, outputOI);
}
+
+ public static PTFRollingPartition createRolling(Configuration cfg,
+ SerDe serDe,
+ StructObjectInspector inputOI,
+ StructObjectInspector outputOI,
+ int precedingSpan,
+ int followingSpan)
+ throws HiveException {
+ return new PTFRollingPartition(cfg, serDe, inputOI, outputOI, precedingSpan, followingSpan);
+ }
public static StructObjectInspector setupPartitionOutputOI(SerDe serDe,
StructObjectInspector tblFnOI) throws SerDeException {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java Sat May 31 18:40:50 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class PTFRollingPartition extends PTFPartition {
+
+ /*
+ * num rows whose output is evaluated.
+ */
+ int numRowsProcessed;
+
+ /*
+ * number rows to maintain before nextRowToProcess
+ */
+ int precedingSpan;
+
+ /*
+ * number rows to maintain after nextRowToProcess
+ */
+ int followingSpan;
+
+ /*
+ * number of rows received.
+ */
+ int numRowsReceived;
+
+ /*
+ * State of the Rolling Partition
+ *
+ * x0 x1 x2 x3 x4 x5 x6 x7 x8 x9 x10 x11 x12 x13 x14 x15 x16 x17
+ * ^ ^ ^
+ * | | |
+ * |--preceding span--numRowsProcessed---followingSpan --numRowsRecived
+ *
+ * a. index x7 : represents the last output row
+ * b. so preceding span rows before that are still held on for subsequent rows processing.
+ * c. The #of rows beyond numRowsProcessed = followingSpan
+ */
+
+ /*
+ * cache of rows; guaranteed to contain precedingSpan rows before
+ * nextRowToProcess.
+ */
+ List<Object> currWindow;
+
+ protected PTFRollingPartition(Configuration cfg, SerDe serDe,
+ StructObjectInspector inputOI, StructObjectInspector outputOI,
+ int precedingSpan, int succeedingSpan) throws HiveException {
+ super(cfg, serDe, inputOI, outputOI, false);
+ this.precedingSpan = precedingSpan;
+ this.followingSpan = succeedingSpan;
+ currWindow = new ArrayList<Object>(precedingSpan + followingSpan);
+ }
+
+ public void reset() throws HiveException {
+ currWindow.clear();
+ numRowsProcessed = 0;
+ numRowsReceived = 0;
+ }
+
+ public Object getAt(int i) throws HiveException {
+ int rangeStart = numRowsReceived - currWindow.size();
+ return currWindow.get(i - rangeStart);
+ }
+
+ public void append(Object o) throws HiveException {
+ @SuppressWarnings("unchecked")
+ List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+ o, inputOI, ObjectInspectorCopyOption.WRITABLE);
+ currWindow.add(l);
+ numRowsReceived++;
+ }
+
+ public Object nextOutputRow() throws HiveException {
+ Object row = getAt(numRowsProcessed);
+ numRowsProcessed++;
+ if (numRowsProcessed > precedingSpan) {
+ currWindow.remove(0);
+ }
+ return row;
+ }
+
+ public boolean processedAllRows() {
+ return numRowsProcessed >= numRowsReceived;
+ }
+
+ public int rowToProcess(WindowFunctionDef wFn) {
+ int rowToProcess = numRowsReceived - wFn.getWindowFrame().getEnd().getAmt()
+ - 1;
+ return rowToProcess >= 0 ? rowToProcess : -1;
+ }
+
+ public int size() {
+ return numRowsReceived;
+ }
+
+ public PTFPartitionIterator<Object> iterator() throws HiveException {
+ return new RollingPItr();
+ }
+
+ public void close() {
+ }
+
+ class RollingPItr implements PTFPartitionIterator<Object> {
+
+ @Override
+ public boolean hasNext() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object next() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getIndex() {
+ return PTFRollingPartition.this.numRowsProcessed;
+ }
+
+ @Override
+ public Object lead(int amt) throws HiveException {
+ int i = PTFRollingPartition.this.numRowsProcessed + amt;
+ i = i >= PTFRollingPartition.this.numRowsReceived ? PTFRollingPartition.this.numRowsReceived - 1
+ : i;
+ return PTFRollingPartition.this.getAt(i);
+ }
+
+ @Override
+ public Object lag(int amt) throws HiveException {
+ int i = PTFRollingPartition.this.numRowsProcessed - amt;
+ int start = PTFRollingPartition.this.numRowsReceived
+ - PTFRollingPartition.this.currWindow.size();
+
+ i = i < start ? start : i;
+ return PTFRollingPartition.this.getAt(i);
+ }
+
+ @Override
+ public Object resetToIndex(int idx) throws HiveException {
+ return PTFRollingPartition.this.getAt(idx);
+ }
+
+ @Override
+ public PTFPartition getPartition() {
+ return PTFRollingPartition.this;
+ }
+
+ @Override
+ public void reset() throws HiveException {
+ }
+
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java Sat May 31 18:40:50 2014
@@ -26,7 +26,12 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.GenericUDAFSumDouble.SumDoubleAgg;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -152,6 +157,59 @@ public class GenericUDAFAverage extends
reset(result);
return result;
}
+
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+
+ /*
+ * Currently we are not handling dynamic sized windows implied by range based windows.
+ */
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return null;
+ }
+
+ /*
+ * Windows that are unbounded following don't benefit from Streaming.
+ */
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return null;
+ }
+
+ return new GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>(this,
+ start.getAmt(), end.getAmt()) {
+
+ @Override
+ protected DoubleWritable getNextResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+ throws HiveException {
+ AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
+ Double r = myagg.count == 0 ? null : myagg.sum;
+ long cnt = myagg.count;
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+ Object[] o = ss.intermediateVals.remove(0);
+ Double d = o == null ? 0.0 : (Double) o[0];
+ r = r == null ? null : r - d;
+ cnt = cnt - ((Long) o[1]);
+ }
+
+ return r == null ? null : new DoubleWritable(r / cnt);
+ }
+
+ @Override
+ protected Object[] getCurrentIntermediateResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+ throws HiveException {
+ AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
+ return myagg.count == 0 ? null : new Object[] {
+ new Double(myagg.sum), myagg.count };
+ }
+
+ };
+ }
}
public static class GenericUDAFAverageEvaluatorDecimal extends AbstractGenericUDAFAverageEvaluator<HiveDecimal> {
@@ -241,6 +299,54 @@ public class GenericUDAFAverage extends
reset(result);
return result;
}
+
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return null;
+ }
+
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return null;
+ }
+
+ return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>(
+ this, start.getAmt(), end.getAmt()) {
+
+ @Override
+ protected HiveDecimalWritable getNextResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+ throws HiveException {
+ AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
+ HiveDecimal r = myagg.count == 0 ? null : myagg.sum;
+ long cnt = myagg.count;
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+ Object[] o = ss.intermediateVals.remove(0);
+ HiveDecimal d = o == null ? HiveDecimal.ZERO : (HiveDecimal) o[0];
+ r = r == null ? null : r.subtract(d);
+ cnt = cnt - ((Long) o[1]);
+ }
+
+ return r == null ? null : new HiveDecimalWritable(
+ r.divide(HiveDecimal.create(cnt)));
+ }
+
+ @Override
+ protected Object[] getCurrentIntermediateResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+ throws HiveException {
+ AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
+ return myagg.count == 0 ? null : new Object[] { myagg.sum,
+ myagg.count };
+ }
+
+ };
+ }
}
private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java Sat May 31 18:40:50 2014
@@ -53,12 +53,12 @@ public class GenericUDAFCumeDist extends
static final Log LOG = LogFactory.getLog(GenericUDAFCumeDist.class.getName());
@Override
- protected GenericUDAFRankEvaluator createEvaluator()
+ protected GenericUDAFAbstractRankEvaluator createEvaluator()
{
return new GenericUDAFCumeDistEvaluator();
}
- public static class GenericUDAFCumeDistEvaluator extends GenericUDAFRankEvaluator
+ public static class GenericUDAFCumeDistEvaluator extends GenericUDAFAbstractRankEvaluator
{
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java Sat May 31 18:40:50 2014
@@ -43,7 +43,7 @@ public class GenericUDAFDenseRank extend
static final Log LOG = LogFactory.getLog(GenericUDAFDenseRank.class.getName());
@Override
- protected GenericUDAFRankEvaluator createEvaluator()
+ protected GenericUDAFAbstractRankEvaluator createEvaluator()
{
return new GenericUDAFDenseRankEvaluator();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java Sat May 31 18:40:50 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -233,4 +234,24 @@ public abstract class GenericUDAFEvaluat
*/
public abstract Object terminate(AggregationBuffer agg) throws HiveException;
+ /**
+ * When evaluating an aggregates over a fixed Window, the naive way to compute
+ * results is to compute the aggregate for each row. But often there is a way
+ * to compute results in a more efficient manner. This method enables the
+ * basic evaluator to provide a function object that does the job in a more
+ * efficient manner.
+ * <p>
+ * This method is called after this Evaluator is initialized. The returned
+ * Function must be initialized. It is passed the 'window' of aggregation for
+ * each row.
+ *
+ * @param wFrmDef
+ * the Window definition in play for this evaluation.
+ * @return null implies that this fn cannot be processed in Streaming mode. So
+ * each row is evaluated independently.
+ */
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+ return null;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java Sat May 31 18:40:50 2014
@@ -49,12 +49,12 @@ public class GenericUDAFPercentRank exte
static final Log LOG = LogFactory.getLog(GenericUDAFPercentRank.class.getName());
@Override
- protected GenericUDAFRankEvaluator createEvaluator()
+ protected GenericUDAFAbstractRankEvaluator createEvaluator()
{
return new GenericUDAFPercentRankEvaluator();
}
- public static class GenericUDAFPercentRankEvaluator extends GenericUDAFRankEvaluator
+ public static class GenericUDAFPercentRankEvaluator extends GenericUDAFAbstractRankEvaluator
{
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java Sat May 31 18:40:50 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.UD
import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -71,7 +72,7 @@ public class GenericUDAFRank extends Abs
return createEvaluator();
}
- protected GenericUDAFRankEvaluator createEvaluator()
+ protected GenericUDAFAbstractRankEvaluator createEvaluator()
{
return new GenericUDAFRankEvaluator();
}
@@ -83,10 +84,12 @@ public class GenericUDAFRank extends Abs
Object[] currVal;
int currentRank;
int numParams;
+ boolean supportsStreaming;
- RankBuffer(int numParams)
+ RankBuffer(int numParams, boolean supportsStreaming)
{
this.numParams = numParams;
+ this.supportsStreaming = supportsStreaming;
init();
}
@@ -96,20 +99,33 @@ public class GenericUDAFRank extends Abs
currentRowNum = 0;
currentRank = 0;
currVal = new Object[numParams];
+ if ( supportsStreaming ) {
+ /* initialize rowNums to have 1 row */
+ rowNums.add(null);
+ }
}
-
+
void incrRowNum() { currentRowNum++; }
void addRank()
{
- rowNums.add(new IntWritable(currentRank));
+ if ( supportsStreaming ) {
+ rowNums.set(0, new IntWritable(currentRank));
+ } else {
+ rowNums.add(new IntWritable(currentRank));
+ }
}
}
- public static class GenericUDAFRankEvaluator extends GenericUDAFEvaluator
+ public static abstract class GenericUDAFAbstractRankEvaluator extends GenericUDAFEvaluator
{
ObjectInspector[] inputOI;
ObjectInspector[] outputOI;
+ boolean isStreamingMode = false;
+
+ protected boolean isStreaming() {
+ return isStreamingMode;
+ }
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException
@@ -132,7 +148,7 @@ public class GenericUDAFRank extends Abs
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException
{
- return new RankBuffer(inputOI.length);
+ return new RankBuffer(inputOI.length, isStreamingMode);
}
@Override
@@ -183,6 +199,23 @@ public class GenericUDAFRank extends Abs
}
+ public static class GenericUDAFRankEvaluator extends
+ GenericUDAFAbstractRankEvaluator implements
+ ISupportStreamingModeForWindowing {
+
+ @Override
+ public Object getNextResult(AggregationBuffer agg) throws HiveException {
+ return ((RankBuffer) agg).rowNums.get(0);
+ }
+
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+ isStreamingMode = true;
+ return this;
+ }
+
+ }
+
public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2,
ObjectInspector[] oi2)
{
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java Sat May 31 18:40:50 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+@SuppressWarnings({ "deprecation", "unchecked" })
+public abstract class GenericUDAFStreamingEnhancer<T1, T2> extends
+ GenericUDAFEvaluator implements ISupportStreamingModeForWindowing {
+
+ private final GenericUDAFEvaluator wrappedEval;
+ private final int numPreceding;
+ private final int numFollowing;
+
+ public GenericUDAFStreamingEnhancer(GenericUDAFEvaluator wrappedEval,
+ int numPreceding, int numFollowing) {
+ this.wrappedEval = wrappedEval;
+ this.numPreceding = numPreceding;
+ this.numFollowing = numFollowing;
+ this.mode = wrappedEval.mode;
+ }
+
+ class StreamingState extends AbstractAggregationBuffer {
+ final AggregationBuffer wrappedBuf;
+ final int numPreceding;
+ final int numFollowing;
+ final List<T1> results;
+ final List<T2> intermediateVals;
+ int numRows;
+
+ StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) {
+ this.wrappedBuf = buf;
+ this.numPreceding = numPreceding;
+ this.numFollowing = numFollowing;
+ results = new ArrayList<T1>();
+ intermediateVals = new ArrayList<T2>();
+ numRows = 0;
+ }
+
+ @Override
+ public int estimate() {
+ if (!(wrappedBuf instanceof AbstractAggregationBuffer)) {
+ return -1;
+ }
+ int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate();
+ if (underlying == -1) {
+ return -1;
+ }
+ if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return -1;
+ }
+ /*
+ * sz Estimate = sz needed by underlying AggBuffer +
+ * sz for results +
+ * sz for intermediates +
+ * 3 * JavaDataModel.PRIMITIVES1
+ * sz of results = sz of underlying * wdwSz
+ * sz of intermediates = sz of underlying * wdwSz
+ */
+
+ int wdwSz = numPreceding + numFollowing + 1;
+ return underlying +
+ (underlying * wdwSz) +
+ (underlying * wdwSz) +
+ (3 * JavaDataModel.PRIMITIVES1);
+ }
+ }
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters)
+ throws HiveException {
+ throw new HiveException(getClass().getSimpleName() + ": init not supported");
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
+ return new StreamingState(numPreceding, numFollowing, underlying);
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+ wrappedEval.reset(ss.wrappedBuf);
+ ss.results.clear();
+ ss.intermediateVals.clear();
+ ss.numRows = 0;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters)
+ throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+
+ wrappedEval.iterate(ss.wrappedBuf, parameters);
+
+ if (ss.numRows >= ss.numFollowing) {
+ ss.results.add(getNextResult(ss));
+ }
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) {
+ ss.intermediateVals.add(getCurrentIntermediateResult(ss));
+ }
+
+ ss.numRows++;
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+ Object o = wrappedEval.terminate(ss.wrappedBuf);
+
+ for (int i = 0; i < ss.numFollowing; i++) {
+ ss.results.add(getNextResult(ss));
+ }
+ return o;
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ throw new HiveException(getClass().getSimpleName()
+ + ": terminatePartial not supported");
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ throw new HiveException(getClass().getSimpleName()
+ + ": merge not supported");
+ }
+
+ @Override
+ public Object getNextResult(AggregationBuffer agg) throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+ if (!ss.results.isEmpty()) {
+ T1 res = ss.results.remove(0);
+ if (res == null) {
+ return ISupportStreamingModeForWindowing.NULL_RESULT;
+ }
+ return res;
+ }
+ return null;
+ }
+
+ protected abstract T1 getNextResult(StreamingState ss) throws HiveException;
+
+ protected abstract T2 getCurrentIntermediateResult(StreamingState ss)
+ throws HiveException;
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java Sat May 31 18:40:50 2014
@@ -24,6 +24,10 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -179,6 +183,49 @@ public class GenericUDAFSum extends Abst
return result;
}
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return null;
+ }
+
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return null;
+ }
+
+ return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>(
+ this, start.getAmt(), end.getAmt()) {
+
+ @Override
+ protected HiveDecimalWritable getNextResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+ throws HiveException {
+ SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
+ HiveDecimal r = myagg.empty ? null : myagg.sum;
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+ HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0);
+ d = d == null ? HiveDecimal.ZERO : d;
+ r = r == null ? null : r.subtract(d);
+ }
+
+ return r == null ? null : new HiveDecimalWritable(r);
+ }
+
+ @Override
+ protected HiveDecimal getCurrentIntermediateResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+ throws HiveException {
+ SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
+ return myagg.empty ? null : myagg.sum;
+ }
+
+ };
+ }
}
/**
@@ -264,6 +311,50 @@ public class GenericUDAFSum extends Abst
return result;
}
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return null;
+ }
+
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return null;
+ }
+
+ return new GenericUDAFStreamingEnhancer<DoubleWritable, Double>(this,
+ start.getAmt(), end.getAmt()) {
+
+ @Override
+ protected DoubleWritable getNextResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+ throws HiveException {
+ SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
+ Double r = myagg.empty ? null : myagg.sum;
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+ Double d = (Double) ss.intermediateVals.remove(0);
+ d = d == null ? 0.0 : d;
+ r = r == null ? null : r - d;
+ }
+
+ return r == null ? null : new DoubleWritable(r);
+ }
+
+ @Override
+ protected Double getCurrentIntermediateResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+ throws HiveException {
+ SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
+ return myagg.empty ? null : new Double(myagg.sum);
+ }
+
+ };
+ }
+
}
/**
@@ -346,6 +437,49 @@ public class GenericUDAFSum extends Abst
return result;
}
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return null;
+ }
+
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return null;
+ }
+
+ return new GenericUDAFStreamingEnhancer<LongWritable, Long>(this,
+ start.getAmt(), end.getAmt()) {
+
+ @Override
+ protected LongWritable getNextResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+ throws HiveException {
+ SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
+ Long r = myagg.empty ? null : myagg.sum;
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
+ Long d = (Long) ss.intermediateVals.remove(0);
+ d = d == null ? 0 : d;
+ r = r == null ? null : r - d;
+ }
+
+ return r == null ? null : new LongWritable(r);
+ }
+
+ @Override
+ protected Long getCurrentIntermediateResult(
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+ throws HiveException {
+ SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
+ return myagg.empty ? null : new Long(myagg.sum);
+ }
+
+ };
+ }
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java Sat May 31 18:40:50 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+/**
+ * A GenericUDAF mode that provides it results as a List to the
+ * {@link WindowingTableFunction} (so it is a
+ * {@link WindowFunctionInfo#isPivotResult()} return true) may support this
+ * interface. If it does then the WindowingTableFunction will ask it for the
+ * next Result after every aggregate call.
+ */
+public interface ISupportStreamingModeForWindowing {
+
+ Object getNextResult(AggregationBuffer agg) throws HiveException;
+
+ public static Object NULL_RESULT = new Object();
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java Sat May 31 18:40:50 2014
@@ -21,37 +21,47 @@ package org.apache.hadoop.hive.ql.udf.pt
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class NoopStreaming extends Noop {
-
+
List<Object> rows;
-
+ StructObjectInspector inputOI;
+
NoopStreaming() {
rows = new ArrayList<Object>();
}
-
- public boolean canAcceptInputAsStream() {
- return true;
- }
-
+
+ public void initializeStreaming(Configuration cfg,
+ StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+ this.inputOI = inputOI;
+ canAcceptInputAsStream = true;
+ }
+
public List<Object> processRow(Object row) throws HiveException {
- if (!canAcceptInputAsStream() ) {
+ if (!canAcceptInputAsStream()) {
throw new HiveException(String.format(
- "Internal error: PTF %s, doesn't support Streaming",
- getClass().getName()));
+ "Internal error: PTF %s, doesn't support Streaming", getClass()
+ .getName()));
}
rows.clear();
+ row = ObjectInspectorUtils.copyToStandardObject(row, inputOI,
+ ObjectInspectorCopyOption.WRITABLE);
rows.add(row);
return rows;
}
-
+
public static class NoopStreamingResolver extends NoopResolver {
@Override
- protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) {
+ protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+ PartitionedTableFunctionDef tDef) {
return new NoopStreaming();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java Sat May 31 18:40:50 2014
@@ -21,36 +21,46 @@ package org.apache.hadoop.hive.ql.udf.pt
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
public class NoopWithMapStreaming extends NoopWithMap {
List<Object> rows;
-
+ StructObjectInspector inputOI;
+
NoopWithMapStreaming() {
rows = new ArrayList<Object>();
}
-
- public boolean canAcceptInputAsStream() {
- return true;
- }
-
+
+ public void initializeStreaming(Configuration cfg,
+ StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+ this.inputOI = inputOI;
+ canAcceptInputAsStream = true;
+ }
+
public List<Object> processRow(Object row) throws HiveException {
- if (!canAcceptInputAsStream() ) {
+ if (!canAcceptInputAsStream()) {
throw new HiveException(String.format(
- "Internal error: PTF %s, doesn't support Streaming",
- getClass().getName()));
+ "Internal error: PTF %s, doesn't support Streaming", getClass()
+ .getName()));
}
rows.clear();
+ row = ObjectInspectorUtils.copyToStandardObject(row, inputOI,
+ ObjectInspectorCopyOption.WRITABLE);
rows.add(row);
return rows;
}
-
+
public static class NoopWithMapStreamingResolver extends NoopWithMapResolver {
@Override
- protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) {
+ protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+ PartitionedTableFunctionDef tDef) {
return new NoopStreaming();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Sat May 31 18:40:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.udf.pt
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/*
@@ -91,6 +93,7 @@ public abstract class TableFunctionEvalu
protected PTFDesc ptfDesc;
boolean transformsRawInput;
transient protected PTFPartition outputPartition;
+ transient protected boolean canAcceptInputAsStream;
static {
PTFUtils.makeTransient(TableFunctionEvaluator.class, "outputOI", "rawInputOI");
@@ -215,9 +218,14 @@ public abstract class TableFunctionEvalu
* remaining o/p rows.
*/
public boolean canAcceptInputAsStream() {
- return false;
+ return canAcceptInputAsStream;
}
-
+
+ public void initializeStreaming(Configuration cfg,
+ StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+ canAcceptInputAsStream = false;
+ }
+
public void startPartition() throws HiveException {
if (!canAcceptInputAsStream() ) {
throw new HiveException(String.format(
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1598896&r1=1598895&r2=1598896&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Sat May 31 18:40:50 2014
@@ -25,9 +25,13 @@ import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFRollingPartition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -43,6 +47,9 @@ import org.apache.hadoop.hive.ql.plan.pt
import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -52,6 +59,8 @@ import org.apache.hadoop.hive.serde2.obj
@SuppressWarnings("deprecation")
public class WindowingTableFunction extends TableFunctionEvaluator {
+ StreamingState streamingState;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
@@ -133,6 +142,251 @@ public class WindowingTableFunction exte
return true;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#canAcceptInputAsStream
+ * ()
+ *
+ * WindowTableFunction supports streaming if all functions meet one of these
+ * conditions: 1. The Function implements ISupportStreamingModeForWindowing 2.
+ * Or returns a non null Object for the getWindowingEvaluator, that implements
+ * ISupportStreamingModeForWindowing. 3. Is an invocation on a 'fixed' window.
+ * So no Unbounded Preceding or Following.
+ */
+ private int[] setCanAcceptInputAsStream(Configuration cfg) {
+
+ canAcceptInputAsStream = false;
+
+ if (ptfDesc.getLlInfo().getLeadLagExprs() != null) {
+ return null;
+ }
+
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+ int precedingSpan = 0;
+ int followingSpan = 0;
+
+ for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+ WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i);
+ WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
+ GenericUDAFEvaluator fnEval = wFnDef.getWFnEval();
+ GenericUDAFEvaluator streamingEval = fnEval
+ .getWindowingEvaluator(wdwFrame);
+ if (streamingEval != null
+ && streamingEval instanceof ISupportStreamingModeForWindowing) {
+ continue;
+ }
+ BoundaryDef start = wdwFrame.getStart();
+ BoundaryDef end = wdwFrame.getEnd();
+ if (!(end instanceof ValueBoundaryDef)
+ && !(start instanceof ValueBoundaryDef)) {
+ if (end.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT
+ && start.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT
+ && end.getDirection() != Direction.PRECEDING
+ && start.getDirection() != Direction.FOLLOWING) {
+
+ int amt = wdwFrame.getStart().getAmt();
+ if (amt > precedingSpan) {
+ precedingSpan = amt;
+ }
+
+ amt = wdwFrame.getEnd().getAmt();
+ if (amt > followingSpan) {
+ followingSpan = amt;
+ }
+ continue;
+ }
+ }
+ return null;
+ }
+
+ int windowLimit = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+
+ if (windowLimit < (followingSpan + precedingSpan + 1)) {
+ return null;
+ }
+
+ canAcceptInputAsStream = true;
+ return new int[] {precedingSpan, followingSpan};
+ }
+
+ @Override
+ public void initializeStreaming(Configuration cfg,
+ StructObjectInspector inputOI, boolean isMapSide) throws HiveException {
+
+ int[] span = setCanAcceptInputAsStream(cfg);
+ if (!canAcceptInputAsStream) {
+ return;
+ }
+
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+
+ for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+ WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i);
+ WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
+ GenericUDAFEvaluator fnEval = wFnDef.getWFnEval();
+ GenericUDAFEvaluator streamingEval = fnEval
+ .getWindowingEvaluator(wdwFrame);
+ if (streamingEval != null) {
+ wFnDef.setWFnEval(streamingEval);
+ if (wFnDef.isPivotResult()) {
+ ListObjectInspector listOI = (ListObjectInspector) wFnDef.getOI();
+ wFnDef.setOI(listOI.getListElementObjectInspector());
+ }
+ }
+ }
+ streamingState = new StreamingState(cfg, inputOI, isMapSide, tabDef,
+ span[0], span[1]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#startPartition()
+ */
+ @Override
+ public void startPartition() throws HiveException {
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+ streamingState.reset(tabDef);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#processRow(java
+ * .lang.Object)
+ *
+ * - hand row to each Function, provided there are enough rows for Function's
+ * window. - call getNextObject on each Function. - output as many rows as
+ * possible, based on minimum sz of Output List
+ */
+ @Override
+ public List<Object> processRow(Object row) throws HiveException {
+
+ streamingState.rollingPart.append(row);
+ row = streamingState.rollingPart
+ .getAt(streamingState.rollingPart.size() - 1);
+
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+
+ for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+ WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+ GenericUDAFEvaluator fnEval = wFn.getWFnEval();
+
+ int a = 0;
+ if (wFn.getArgs() != null) {
+ for (PTFExpressionDef arg : wFn.getArgs()) {
+ streamingState.funcArgs[i][a++] = arg.getExprEvaluator().evaluate(row);
+ }
+ }
+
+ if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]);
+ Object out = ((ISupportStreamingModeForWindowing) fnEval)
+ .getNextResult(streamingState.aggBuffers[i]);
+ if (out != null) {
+ streamingState.fnOutputs[i]
+ .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
+ : out);
+ }
+ } else {
+ int rowToProcess = streamingState.rollingPart.rowToProcess(wFn);
+ if (rowToProcess >= 0) {
+ Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+ streamingState.order);
+ PTFPartitionIterator<Object> rItr = rng.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+ Object out = evaluateWindowFunction(wFn, rItr);
+ streamingState.fnOutputs[i].add(out);
+ }
+ }
+ }
+
+ List<Object> oRows = new ArrayList<Object>();
+ while (true) {
+ boolean hasRow = streamingState.hasOutputRow();
+
+ if (!hasRow) {
+ break;
+ }
+
+ oRows.add(streamingState.nextOutputRow());
+ }
+
+ return oRows.size() == 0 ? null : oRows;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#finishPartition()
+ *
+ * for fns that are not ISupportStreamingModeForWindowing give them the
+ * remaining rows (rows whose span went beyond the end of the partition) for
+ * rest of the functions invoke terminate.
+ *
+ * while numOutputRows < numInputRows for each Fn that doesn't have enough o/p
+ * invoke getNextObj if there is no O/p then flag this as an error.
+ */
+ @Override
+ public List<Object> finishPartition() throws HiveException {
+
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+ for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
+ WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+ GenericUDAFEvaluator fnEval = wFn.getWFnEval();
+
+ int numRowsRemaining = wFn.getWindowFrame().getEnd().getAmt();
+ if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ fnEval.terminate(streamingState.aggBuffers[i]);
+ if (numRowsRemaining != BoundarySpec.UNBOUNDED_AMOUNT) {
+ while (numRowsRemaining > 0) {
+ Object out = ((ISupportStreamingModeForWindowing) fnEval)
+ .getNextResult(streamingState.aggBuffers[i]);
+ if (out != null) {
+ streamingState.fnOutputs[i]
+ .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
+ : out);
+ }
+ numRowsRemaining--;
+ }
+ }
+ } else {
+ while (numRowsRemaining > 0) {
+ int rowToProcess = streamingState.rollingPart.size()
+ - numRowsRemaining;
+ Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+ streamingState.order);
+ PTFPartitionIterator<Object> rItr = rng.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+ Object out = evaluateWindowFunction(wFn, rItr);
+ streamingState.fnOutputs[i].add(out);
+ numRowsRemaining--;
+ }
+ }
+
+ }
+
+ List<Object> oRows = new ArrayList<Object>();
+
+ while (!streamingState.rollingPart.processedAllRows()) {
+ boolean hasRow = streamingState.hasOutputRow();
+ ;
+
+ if (!hasRow) {
+ throw new HiveException(
+ "Internal Error: cannot generate all output rows for a Partition");
+ }
+ oRows.add(streamingState.nextOutputRow());
+ }
+
+ return oRows.size() == 0 ? null : oRows;
+ }
+
@Override
public boolean canIterateOutput() {
return true;
@@ -155,16 +409,19 @@ public class WindowingTableFunction exte
Object out = evaluateWindowFunction(wFn, pItr);
output.add(out);
} else if (wFn.isPivotResult()) {
- /*
- * for functions that currently return the output as a List,
- * for e.g. the ranking functions, lead/lag, ntile, cume_dist
- * - for now continue to execute them here. The functions need to provide a way to get
- * each output row as we are iterating through the input. This is relative
- * easy to do for ranking functions; not possible for lead, ntile, cume_dist.
- *
- */
- outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
- output.add(null);
+ GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame());
+ if ( streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing ) {
+ wFn.setWFnEval(streamingEval);
+ if ( wFn.getOI() instanceof ListObjectInspector ) {
+ ListObjectInspector listOI = (ListObjectInspector) wFn.getOI();
+ wFn.setOI(listOI.getListElementObjectInspector());
+ }
+ output.add(null);
+ wFnsWithWindows.add(i);
+ } else {
+ outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+ output.add(null);
+ }
} else {
output.add(null);
wFnsWithWindows.add(i);
@@ -884,8 +1141,10 @@ public class WindowingTableFunction exte
Order order;
PTFDesc ptfDesc;
StructObjectInspector inputOI;
+ AggregationBuffer[] aggBuffers;
+ Object[][] args;
- WindowingIterator(PTFPartition iPart, ArrayList<Object> output,
+ WindowingIterator(PTFPartition iPart, ArrayList<Object> output,
List<?>[] outputFromPivotFunctions, int[] wFnsToProcess) {
this.iPart = iPart;
this.output = output;
@@ -896,6 +1155,18 @@ public class WindowingTableFunction exte
order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
ptfDesc = getQueryDef();
inputOI = iPart.getOutputOI();
+
+ aggBuffers = new AggregationBuffer[wTFnDef.getWindowFunctions().size()];
+ args = new Object[wTFnDef.getWindowFunctions().size()][];
+ try {
+ for (int j : wFnsToProcess) {
+ WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
+ aggBuffers[j] = wFn.getWFnEval().getNewAggregationBuffer();
+ args[j] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+ }
+ } catch (HiveException he) {
+ throw new RuntimeException(he);
+ }
}
@Override
@@ -915,10 +1186,25 @@ public class WindowingTableFunction exte
try {
for (int j : wFnsToProcess) {
WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j);
- Range rng = getRange(wFn, currIdx, iPart, order);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- output.set(j, evaluateWindowFunction(wFn, rItr));
+ if (wFn.getWFnEval() instanceof ISupportStreamingModeForWindowing) {
+ Object iRow = iPart.getAt(currIdx);
+ int a = 0;
+ if (wFn.getArgs() != null) {
+ for (PTFExpressionDef arg : wFn.getArgs()) {
+ args[j][a++] = arg.getExprEvaluator().evaluate(iRow);
+ }
+ }
+ wFn.getWFnEval().aggregate(aggBuffers[j], args[j]);
+ Object out = ((ISupportStreamingModeForWindowing) wFn.getWFnEval())
+ .getNextResult(aggBuffers[j]);
+ out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
+ output.set(j, out);
+ } else {
+ Range rng = getRange(wFn, currIdx, iPart, order);
+ PTFPartitionIterator<Object> rItr = rng.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+ output.set(j, evaluateWindowFunction(wFn, rItr));
+ }
}
Object iRow = iPart.getAt(currIdx);
@@ -942,4 +1228,72 @@ public class WindowingTableFunction exte
}
+ class StreamingState {
+ PTFRollingPartition rollingPart;
+ List<Object>[] fnOutputs;
+ AggregationBuffer[] aggBuffers;
+ Object[][] funcArgs;
+ Order order;
+
+ @SuppressWarnings("unchecked")
+ StreamingState(Configuration cfg, StructObjectInspector inputOI,
+ boolean isMapSide, WindowTableFunctionDef tabDef, int precedingSpan,
+ int followingSpan) throws HiveException {
+ SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde()
+ : tabDef.getRawInputShape().getSerde();
+ StructObjectInspector outputOI = isMapSide ? tabDef.getInput()
+ .getOutputShape().getOI() : tabDef.getRawInputShape().getOI();
+ rollingPart = PTFPartition.createRolling(cfg, serde, inputOI, outputOI,
+ precedingSpan, followingSpan);
+
+ order = tabDef.getOrder().getExpressions().get(0).getOrder();
+
+ int numFns = tabDef.getWindowFunctions().size();
+ fnOutputs = new ArrayList[numFns];
+
+ aggBuffers = new AggregationBuffer[numFns];
+ funcArgs = new Object[numFns][];
+ for (int i = 0; i < numFns; i++) {
+ fnOutputs[i] = new ArrayList<Object>();
+ WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+ funcArgs[i] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
+ }
+ }
+
+ void reset(WindowTableFunctionDef tabDef) throws HiveException {
+ int numFns = tabDef.getWindowFunctions().size();
+ rollingPart.reset();
+ for (int i = 0; i < fnOutputs.length; i++) {
+ fnOutputs[i].clear();
+ }
+
+ for (int i = 0; i < numFns; i++) {
+ WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
+ aggBuffers[i] = wFn.getWFnEval().getNewAggregationBuffer();
+ }
+ }
+
+ boolean hasOutputRow() {
+ for (int i = 0; i < fnOutputs.length; i++) {
+ if (fnOutputs[i].size() == 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ List<Object> nextOutputRow() throws HiveException {
+ List<Object> oRow = new ArrayList<Object>();
+ Object iRow = rollingPart.nextOutputRow();
+ int i = 0;
+ for (; i < fnOutputs.length; i++) {
+ oRow.add(fnOutputs[i].remove(0));
+ }
+ for (StructField f : rollingPart.getOutputOI().getAllStructFieldRefs()) {
+ oRow.add(rollingPart.getOutputOI().getStructFieldData(iRow, f));
+ }
+ return oRow;
+ }
+ }
+
}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java?rev=1598896&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java Sat May 31 18:40:50 2014
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udaf;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.Test;
+
+public class TestStreamingAvg {
+
+ public void avgDouble(Iterator<Double> inVals, int inSz, int numPreceding,
+ int numFollowing, Iterator<Double> outVals) throws HiveException {
+
+ GenericUDAFAverage fnR = new GenericUDAFAverage();
+ TypeInfo[] inputTypes = { TypeInfoFactory.doubleTypeInfo };
+ ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableDoubleObjectInspector };
+
+ DoubleWritable[] in = new DoubleWritable[1];
+ in[0] = new DoubleWritable();
+
+ TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.DoubleHandler,
+ in, inputOIs, inSz, numPreceding, numFollowing, outVals);
+
+ }
+
+ public void avgHiveDecimal(Iterator<HiveDecimal> inVals, int inSz,
+ int numPreceding, int numFollowing, Iterator<HiveDecimal> outVals)
+ throws HiveException {
+
+ GenericUDAFAverage fnR = new GenericUDAFAverage();
+ TypeInfo[] inputTypes = { TypeInfoFactory.decimalTypeInfo };
+ ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector };
+
+ HiveDecimalWritable[] in = new HiveDecimalWritable[1];
+ in[0] = new HiveDecimalWritable();
+
+ TestStreamingSum._agg(fnR, inputTypes, inVals,
+ TypeHandler.HiveDecimalHandler, in, inputOIs, inSz, numPreceding,
+ numFollowing, outVals);
+
+ }
+
+ @Test
+ public void testDouble_3_4() throws HiveException {
+
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(15.0 / 5, 21.0 / 6, 28.0 / 7,
+ 36.0 / 8, 44.0 / 8, 52.0 / 8, 49.0 / 7, 45.0 / 6, 40.0 / 5, 34.0 / 4);
+ avgDouble(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testHiveDecimal_3_4() throws HiveException {
+
+ List<HiveDecimal> inVals = Arrays
+ .asList(HiveDecimal.create(1L), HiveDecimal.create(2L),
+ HiveDecimal.create(3L), HiveDecimal.create(4L),
+ HiveDecimal.create(5L), HiveDecimal.create(6L),
+ HiveDecimal.create(7L), HiveDecimal.create(8L),
+ HiveDecimal.create(9L), HiveDecimal.create(10L));
+ List<HiveDecimal> outVals = Arrays.asList(
+ HiveDecimal.create(new BigDecimal(15.0 / 5)),
+ HiveDecimal.create(new BigDecimal(21.0 / 6)),
+ HiveDecimal.create(new BigDecimal(28.0 / 7)),
+ HiveDecimal.create(new BigDecimal(36.0 / 8)),
+ HiveDecimal.create(new BigDecimal(44.0 / 8)),
+ HiveDecimal.create(new BigDecimal(52.0 / 8)),
+ HiveDecimal.create(new BigDecimal(49.0 / 7)),
+ HiveDecimal.create(new BigDecimal(45.0 / 6)),
+ HiveDecimal.create(new BigDecimal(40.0 / 5)),
+ HiveDecimal.create(new BigDecimal(34.0 / 4)));
+ avgHiveDecimal(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_3_0() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4,
+ 14.0 / 4, 18.0 / 4, 22.0 / 4, 26.0 / 4, 30.0 / 4, 34.0 / 4);
+ avgDouble(inVals.iterator(), 10, 3, 0, outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_unb_0() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4,
+ 15.0 / 5, 21.0 / 6, 28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10);
+ avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0,
+ outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_0_5() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(21.0 / 6, 27.0 / 6, 33.0 / 6,
+ 39.0 / 6, 45.0 / 6, 40.0 / 5, 34.0 / 4, 27.0 / 3, 19.0 / 2, 10.0 / 1);
+ avgDouble(inVals.iterator(), 10, 0, 5, outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_unb_5() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(21.0 / 6, 28.0 / 7, 36.0 / 8,
+ 45.0 / 9, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10,
+ 55.0 / 10);
+ avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5,
+ outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_7_2() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(6.0 / 3, 10.0 / 4, 15.0 / 5, 21.0 / 6,
+ 28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10, 54.0 / 9, 52.0 / 8);
+ avgDouble(inVals.iterator(), 10, 7, 2, outVals.iterator());
+ }
+
+ @Test
+ public void testDouble_15_15() throws HiveException {
+ List<Double> inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
+ 9.0, 10.0);
+ List<Double> outVals = Arrays.asList(55.0 / 10, 55.0 / 10, 55.0 / 10,
+ 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10,
+ 55.0 / 10);
+ avgDouble(inVals.iterator(), 10, 15, 15, outVals.iterator());
+ }
+}