You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2017/01/23 14:32:53 UTC
hive git commit: HIVE-15617: Improve the avg performance for Range
based window (Aihua Xu, reviewed by Yongzhi Chen)
Repository: hive
Updated Branches:
refs/heads/master f6b26b9e4 -> 7c57c05cd
HIVE-15617: Improve the avg performance for Range based window (Aihua Xu, reviewed by Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7c57c05c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7c57c05c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7c57c05c
Branch: refs/heads/master
Commit: 7c57c05cd6e152fc361131af41ceae0f5ca8c230
Parents: f6b26b9
Author: Aihua Xu <ai...@apache.org>
Authored: Fri Jan 13 11:48:28 2017 -0500
Committer: Aihua Xu <ai...@apache.org>
Committed: Mon Jan 23 09:30:16 2017 -0500
----------------------------------------------------------------------
.../hive/ql/udf/generic/GenericUDAFAverage.java | 37 ++-
.../hive/ql/udf/ptf/BasePartitionEvaluator.java | 259 +++++++++++++++----
2 files changed, 240 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7c57c05c/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 5ad5c06..a28f7e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -19,18 +19,21 @@ package org.apache.hadoop.hive.ql.udf.generic;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationType;
+import org.apache.hadoop.hive.ql.udf.ptf.BasePartitionEvaluator;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -219,6 +222,19 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
};
}
+
+ @Override
+ protected BasePartitionEvaluator createPartitionEvaluator(
+ WindowFrameDef winFrame,
+ PTFPartition partition,
+ List<PTFExpressionDef> parameters,
+ ObjectInspector outputOI) {
+ try {
+ return new BasePartitionEvaluator.AvgPartitionDoubleEvaluator(this, winFrame, partition, parameters, inputOI, outputOI);
+ } catch(HiveException e) {
+ return super.createPartitionEvaluator(winFrame, partition, parameters, outputOI);
+ }
+ }
}
public static class GenericUDAFAverageEvaluatorDecimal extends AbstractGenericUDAFAverageEvaluator<HiveDecimal> {
@@ -358,6 +374,19 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
};
}
+
+ @Override
+ protected BasePartitionEvaluator createPartitionEvaluator(
+ WindowFrameDef winFrame,
+ PTFPartition partition,
+ List<PTFExpressionDef> parameters,
+ ObjectInspector outputOI) {
+ try {
+ return new BasePartitionEvaluator.AvgPartitionHiveDecimalEvaluator(this, winFrame, partition, parameters, inputOI, outputOI);
+ } catch(HiveException e) {
+ return super.createPartitionEvaluator(winFrame, partition, parameters, outputOI);
+ }
+ }
}
@AggregationType(estimable = true)
@@ -409,6 +438,8 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
super.init(m, parameters);
// init input
+ partialResult = new Object[2];
+ partialResult[0] = new LongWritable(0);
if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
copiedOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
@@ -436,8 +467,6 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
fname.add("count");
fname.add("sum");
fname.add("input");
- partialResult = new Object[2];
- partialResult[0] = new LongWritable(0);
// index 1 set by child
return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
} else {
@@ -445,7 +474,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
}
}
- protected boolean isWindowingDistinct() {
+ public boolean isWindowingDistinct() {
return isWindowing && avgDistinct;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7c57c05c/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java
index f5f9f7b..86954ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.ptf;
import java.util.List;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType;
import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage.AbstractGenericUDAFAverageEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
@@ -84,6 +86,96 @@ public class BasePartitionEvaluator {
}
}
+ /**
+ * Define some type specific operation to used in the subclass
+ */
+ private static abstract class TypeOperationBase<ResultType> {
+ public abstract ResultType add(ResultType t1, ResultType t2);
+ public abstract ResultType minus(ResultType t1, ResultType t2);
+ public abstract ResultType div(ResultType sum, long numRows);
+ }
+
+ private static class TypeOperationLongWritable extends TypeOperationBase<LongWritable> {
+ @Override
+ public LongWritable add(LongWritable t1, LongWritable t2) {
+ if (t1 == null && t2 == null) return null;
+ return new LongWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get()));
+ }
+
+ @Override
+ public LongWritable minus(LongWritable t1, LongWritable t2) {
+ if (t1 == null && t2 == null) return null;
+ return new LongWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get()));
+ }
+
+ @Override
+ public LongWritable div(LongWritable sum, long numRows) {
+ return null; // Not used
+ }
+ }
+
+ private static class TypeOperationDoubleWritable extends TypeOperationBase<DoubleWritable> {
+ @Override
+ public DoubleWritable add(DoubleWritable t1, DoubleWritable t2) {
+ if (t1 == null && t2 == null) return null;
+ return new DoubleWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get()));
+ }
+
+ public DoubleWritable minus(DoubleWritable t1, DoubleWritable t2) {
+ if (t1 == null && t2 == null) return null;
+ return new DoubleWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get()));
+ }
+
+ @Override
+ public DoubleWritable div(DoubleWritable sum, long numRows) {
+ if (sum == null || numRows == 0) return null;
+
+ return new DoubleWritable(sum.get() / (double)numRows);
+ }
+ }
+
+ private static class TypeOperationHiveDecimalWritable extends TypeOperationBase<HiveDecimalWritable> {
+ @Override
+ public HiveDecimalWritable div(HiveDecimalWritable sum, long numRows) {
+ if (sum == null || numRows == 0) return null;
+
+ HiveDecimalWritable result = new HiveDecimalWritable(sum);
+ result.mutateDivide(HiveDecimal.create(numRows));
+ return result;
+ }
+
+ @Override
+ public HiveDecimalWritable add(HiveDecimalWritable t1, HiveDecimalWritable t2) {
+ if (t1 == null && t2 == null) return null;
+
+ if (t1 == null) {
+ return new HiveDecimalWritable(t2);
+ } else {
+ HiveDecimalWritable result = new HiveDecimalWritable(t1);
+ if (t2 != null) {
+ result.mutateAdd(t2);
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public HiveDecimalWritable minus(HiveDecimalWritable t1, HiveDecimalWritable t2) {
+ if (t1 == null && t2 == null) return null;
+
+ if (t2 == null) {
+ return new HiveDecimalWritable(t1);
+ } else {
+ HiveDecimalWritable result = new HiveDecimalWritable(t2);
+ result.mutateNegate();
+ if (t1 != null) {
+ result.mutateAdd(t1);
+ }
+ return result;
+ }
+ }
+ }
+
public BasePartitionEvaluator(
GenericUDAFEvaluator wrappedEvaluator,
WindowFrameDef winFrame,
@@ -217,7 +309,14 @@ public class BasePartitionEvaluator {
*
*/
public static abstract class SumPartitionEvaluator<ResultType extends Writable> extends BasePartitionEvaluator {
+ static class WindowSumAgg<ResultType> extends AbstractAggregationBuffer {
+ Range prevRange;
+ ResultType prevSum;
+ boolean empty;
+ }
+
protected final WindowSumAgg<ResultType> sumAgg;
+ protected TypeOperationBase<ResultType> typeOperation;
public SumPartitionEvaluator(
GenericUDAFEvaluator wrappedEvaluator,
@@ -229,15 +328,6 @@ public class BasePartitionEvaluator {
sumAgg = new WindowSumAgg<ResultType>();
}
- static class WindowSumAgg<ResultType> extends AbstractAggregationBuffer {
- Range prevRange;
- ResultType prevSum;
- boolean empty;
- }
-
- public abstract ResultType add(ResultType t1, ResultType t2);
- public abstract ResultType minus(ResultType t1, ResultType t2);
-
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException {
@@ -262,7 +352,8 @@ public class BasePartitionEvaluator {
Range r2 = new Range(sumAgg.prevRange.end, currentRange.end, partition);
ResultType sum1 = (ResultType)calcFunctionValue(r1.iterator(), leadLagInfo);
ResultType sum2 = (ResultType)calcFunctionValue(r2.iterator(), leadLagInfo);
- result = add(minus(sumAgg.prevSum, sum1), sum2);
+ result = typeOperation.add(typeOperation.minus(sumAgg.prevSum, sum1), sum2);
+
sumAgg.prevRange = currentRange;
sumAgg.prevSum = result;
}
@@ -276,18 +367,7 @@ public class BasePartitionEvaluator {
WindowFrameDef winFrame, PTFPartition partition,
List<PTFExpressionDef> parameters, ObjectInspector outputOI) {
super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
- }
-
- @Override
- public DoubleWritable add(DoubleWritable t1, DoubleWritable t2) {
- if (t1 == null && t2 == null) return null;
- return new DoubleWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get()));
- }
-
- @Override
- public DoubleWritable minus(DoubleWritable t1, DoubleWritable t2) {
- if (t1 == null && t2 == null) return null;
- return new DoubleWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get()));
+ this.typeOperation = new TypeOperationDoubleWritable();
}
}
@@ -296,18 +376,7 @@ public class BasePartitionEvaluator {
WindowFrameDef winFrame, PTFPartition partition,
List<PTFExpressionDef> parameters, ObjectInspector outputOI) {
super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
- }
-
- @Override
- public LongWritable add(LongWritable t1, LongWritable t2) {
- if (t1 == null && t2 == null) return null;
- return new LongWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get()));
- }
-
- @Override
- public LongWritable minus(LongWritable t1, LongWritable t2) {
- if (t1 == null && t2 == null) return null;
- return new LongWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get()));
+ this.typeOperation = new TypeOperationLongWritable();
}
}
@@ -316,33 +385,119 @@ public class BasePartitionEvaluator {
WindowFrameDef winFrame, PTFPartition partition,
List<PTFExpressionDef> parameters, ObjectInspector outputOI) {
super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
+ this.typeOperation = new TypeOperationHiveDecimalWritable();
}
+ }
- @Override
- public HiveDecimalWritable add(HiveDecimalWritable t1, HiveDecimalWritable t2) {
- if (t1 == null && t2 == null) return null;
- if (t1 == null) {
- return t2;
- } else {
- if (t2 != null) {
- t1.mutateAdd(t2);
+ /**
+ * The partition evalulator for average function
+ * @param <ResultType>
+ */
+ public static abstract class AvgPartitionEvaluator<ResultType extends Writable>
+ extends BasePartitionEvaluator {
+ static class WindowAvgAgg<ResultType> extends AbstractAggregationBuffer {
+ Range prevRange;
+ ResultType prevSum;
+ long prevCount;
+ boolean empty;
+ }
+
+ protected SumPartitionEvaluator<ResultType> sumEvaluator;
+ protected TypeOperationBase<ResultType> typeOperation;
+ WindowAvgAgg<ResultType> avgAgg = new WindowAvgAgg<ResultType>();
+
+ public AvgPartitionEvaluator(
+ GenericUDAFEvaluator wrappedEvaluator,
+ WindowFrameDef winFrame,
+ PTFPartition partition,
+ List<PTFExpressionDef> parameters,
+ ObjectInspector outputOI) {
+ super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
+ }
+
+ /**
+ * Calculate the partial result sum + count giving a parition range
+ * @return a 2-element Object array of [count long, sum ResultType]
+ */
+ private Object[] calcPartialResult(PTFPartitionIterator<Object> pItr, LeadLagInfo leadLagInfo)
+ throws HiveException {
+ // To handle the case like SUM(LAG(f)) over(), aggregation function includes
+ // LAG/LEAD call
+ PTFOperator.connectLeadLagFunctionsToPartition(leadLagInfo, pItr);
+
+ AggregationBuffer aggBuffer = wrappedEvaluator.getNewAggregationBuffer();
+ Object[] argValues = new Object[parameters == null ? 0 : parameters.size()];
+ while(pItr.hasNext())
+ {
+ Object row = pItr.next();
+ int i = 0;
+ if ( parameters != null ) {
+ for(PTFExpressionDef param : parameters)
+ {
+ argValues[i++] = param.getExprEvaluator().evaluate(row);
+ }
}
- return t1;
+ wrappedEvaluator.aggregate(aggBuffer, argValues);
}
+
+ // The object [count LongWritable, sum ResultType] is reused during evaluating
+ Object[] partial = (Object[])wrappedEvaluator.terminatePartial(aggBuffer);
+ return new Object[] {((LongWritable)partial[0]).get(), ObjectInspectorUtils.copyToStandardObject(partial[1], outputOI)};
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public HiveDecimalWritable minus(HiveDecimalWritable t1, HiveDecimalWritable t2) {
- if (t1 == null && t2 == null) return null;
- if (t1 == null) {
- t2.mutateNegate();
- return t2;
+ public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException {
+ // // Currently avg(distinct) not supported in PartitionEvaluator
+ if (((AbstractGenericUDAFAverageEvaluator)wrappedEvaluator).isWindowingDistinct()) {
+ return super.iterate(currentRow, leadLagInfo);
+ }
+
+ Range currentRange = getRange(winFrame, currentRow, partition);
+ if (currentRow == 0 || // Reset for the new partition
+ avgAgg.prevRange == null ||
+ currentRange.getSize() <= currentRange.getDiff(avgAgg.prevRange)) {
+ Object[] partial = (Object[])calcPartialResult(currentRange.iterator(), leadLagInfo);
+ avgAgg.prevRange = currentRange;
+ avgAgg.empty = false;
+ avgAgg.prevSum = (ResultType)partial[1];
+ avgAgg.prevCount = (long)partial[0];
} else {
- if (t2 != null) {
- t1.mutateSubtract(t2);
- }
- return t1;
+ // Given the previous range and the current range, calculate the new sum
+ // from the previous sum and the difference to save the computation.
+ Range r1 = new Range(avgAgg.prevRange.start, currentRange.start, partition);
+ Range r2 = new Range(avgAgg.prevRange.end, currentRange.end, partition);
+ Object[] partial1 = (Object[])calcPartialResult(r1.iterator(), leadLagInfo);
+ Object[] partial2 = (Object[])calcPartialResult(r2.iterator(), leadLagInfo);
+ ResultType sum = typeOperation.add(typeOperation.minus(avgAgg.prevSum, (ResultType)partial1[1]), (ResultType)partial2[1]);
+ long count = avgAgg.prevCount - (long)partial1[0]+ (long)partial2[0];
+
+ avgAgg.prevRange = currentRange;
+ avgAgg.prevSum = sum;
+ avgAgg.prevCount = count;
}
+
+ return typeOperation.div(avgAgg.prevSum, avgAgg.prevCount);
+ }
+ }
+
+ public static class AvgPartitionDoubleEvaluator extends AvgPartitionEvaluator<DoubleWritable> {
+
+ public AvgPartitionDoubleEvaluator(GenericUDAFEvaluator wrappedEvaluator,
+ WindowFrameDef winFrame, PTFPartition partition,
+ List<PTFExpressionDef> parameters, ObjectInspector inputOI, ObjectInspector outputOI) throws HiveException {
+ super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
+ this.typeOperation = new TypeOperationDoubleWritable();
+ }
+ }
+
+ public static class AvgPartitionHiveDecimalEvaluator extends AvgPartitionEvaluator<HiveDecimalWritable> {
+
+ public AvgPartitionHiveDecimalEvaluator(GenericUDAFEvaluator wrappedEvaluator,
+ WindowFrameDef winFrame, PTFPartition partition,
+ List<PTFExpressionDef> parameters, ObjectInspector inputOI, ObjectInspector outputOI) throws HiveException {
+ super(wrappedEvaluator, winFrame, partition, parameters, outputOI);
+ this.typeOperation = new TypeOperationHiveDecimalWritable();
}
}
}
\ No newline at end of file