You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2016/05/29 00:37:30 UTC
hive git commit: HIVE-13453: Support ORDER BY and windowing clause in
partitioning clause with distinct function (Reviewed by Yongzhi Chen)
Repository: hive
Updated Branches:
refs/heads/master e1e68b29a -> 15bdce43d
HIVE-13453: Support ORDER BY and windowing clause in partitioning clause with distinct function (Reviewed by Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bdce43
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bdce43
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bdce43
Branch: refs/heads/master
Commit: 15bdce43db4624a63be1f648e46d1f2baa1c67de
Parents: e1e68b2
Author: Aihua Xu <ai...@apache.org>
Authored: Fri May 6 11:00:20 2016 -0400
Committer: Aihua Xu <ai...@apache.org>
Committed: Sat May 28 20:36:59 2016 -0400
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FunctionRegistry.java | 2 +-
.../apache/hadoop/hive/ql/exec/Registry.java | 8 +-
.../hadoop/hive/ql/parse/WindowingSpec.java | 14 --
.../hive/ql/plan/ptf/WindowFunctionDef.java | 2 +-
.../hive/ql/udf/generic/GenericUDAFAverage.java | 68 ++++++++--
.../hive/ql/udf/generic/GenericUDAFCount.java | 57 +++++---
.../udf/generic/GenericUDAFParameterInfo.java | 7 +
.../hive/ql/udf/generic/GenericUDAFSum.java | 134 +++++++++++++------
.../generic/SimpleGenericUDAFParameterInfo.java | 9 +-
.../hive/ql/udf/ptf/WindowingTableFunction.java | 9 +-
.../queries/clientpositive/windowing_distinct.q | 18 +++
.../clientpositive/windowing_distinct.q.out | 66 +++++++++
.../objectinspector/ObjectInspectorUtils.java | 38 ++++++
13 files changed, 333 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index fa90242..8217ad3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -902,7 +902,7 @@ public final class FunctionRegistry {
GenericUDAFParameterInfo paramInfo =
new SimpleGenericUDAFParameterInfo(
- args, isDistinct, isAllColumns);
+ args, false, isDistinct, isAllColumns);
GenericUDAFEvaluator udafEvaluator;
if (udafResolver instanceof GenericUDAFResolver2) {
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index 891514b..86df74d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -395,7 +395,7 @@ public class Registry {
*/
@SuppressWarnings("deprecation")
public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
- List<ObjectInspector> argumentOIs, boolean isDistinct,
+ List<ObjectInspector> argumentOIs, boolean isWindowing, boolean isDistinct,
boolean isAllColumns) throws SemanticException {
GenericUDAFResolver udafResolver = getGenericUDAFResolver(name);
@@ -413,7 +413,7 @@ public class Registry {
GenericUDAFParameterInfo paramInfo =
new SimpleGenericUDAFParameterInfo(
- args, isDistinct, isAllColumns);
+ args, isWindowing, isDistinct, isAllColumns);
if (udafResolver instanceof GenericUDAFResolver2) {
udafEvaluator =
((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
@@ -433,14 +433,14 @@ public class Registry {
}
if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) &&
!functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) {
- return getGenericUDAFEvaluator(functionName, argumentOIs, isDistinct, isAllColumns);
+ return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns);
}
// this must be lead/lag UDAF
ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
GenericUDAFResolver udafResolver = info.getGenericUDAFResolver();
GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
- argumentOIs.toArray(args), isDistinct, isAllColumns);
+ argumentOIs.toArray(args), true, isDistinct, isAllColumns);
return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
index 5ce7200..ef5186a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
@@ -124,9 +124,6 @@ public class WindowingSpec {
WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
WindowSpec wdwSpec = wFn.getWindowSpec();
- // 0. Precheck supported syntax
- precheckSyntax(wFn, wdwSpec);
-
// 1. For Wdw Specs that refer to Window Defns, inherit missing components
if ( wdwSpec != null ) {
ArrayList<String> sources = new ArrayList<String>();
@@ -153,14 +150,6 @@ public class WindowingSpec {
}
}
- private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException {
- if (wdwSpec != null ) {
- if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() != null) ) {
- throw new SemanticException("Function with DISTINCT cannot work with partition ORDER BY or windowing clause.");
- }
- }
- }
-
private void fillInWindowSpec(String sourceId, WindowSpec dest, ArrayList<String> visited)
throws SemanticException
{
@@ -509,9 +498,6 @@ public class WindowingSpec {
if ( getOrder() == null ) {
OrderSpec order = new OrderSpec();
order.prefixBy(getPartition());
- if (wFn.isDistinct) {
- order.addExpressions(wFn.getArgs());
- }
setOrder(order);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
index ed6c671..84ac614 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
@@ -124,4 +124,4 @@ public class WindowFunctionDef extends WindowExpressionDef {
this.pivotResult = pivotResult;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 3c1ce26..6799978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.udf.generic;
import java.util.ArrayList;
+import java.util.HashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
@@ -106,6 +108,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
AbstractGenericUDAFAverageEvaluator eval =
(AbstractGenericUDAFAverageEvaluator) getEvaluator(paramInfo.getParameters());
eval.avgDistinct = paramInfo.isDistinct();
+ eval.isWindowing = paramInfo.isWindowing();
return eval;
}
@@ -115,7 +118,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
public void doReset(AverageAggregationBuffer<Double> aggregation) throws HiveException {
aggregation.count = 0;
aggregation.sum = new Double(0);
- aggregation.previousValue = null;
+ aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
}
@Override
@@ -145,6 +148,12 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
}
@Override
+ protected void doMergeAdd(Double sum,
+ ObjectInspectorObject obj) {
+ sum += PrimitiveObjectInspectorUtils.getDouble(obj.getValues()[0], copiedOI);
+ }
+
+ @Override
protected void doTerminatePartial(AverageAggregationBuffer<Double> aggregation) {
if(partialResult[1] == null) {
partialResult[1] = new DoubleWritable(0);
@@ -172,6 +181,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this, wFrameDef) {
@@ -212,6 +225,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
public void doReset(AverageAggregationBuffer<HiveDecimal> aggregation) throws HiveException {
aggregation.count = 0;
aggregation.sum = HiveDecimal.ZERO;
+ aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
}
@Override
@@ -263,6 +277,14 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
}
}
+
+ @Override
+ protected void doMergeAdd(
+ HiveDecimal sum,
+ ObjectInspectorObject obj) {
+ sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(obj.getValues()[0], copiedOI));
+ }
+
@Override
protected void doTerminatePartial(AverageAggregationBuffer<HiveDecimal> aggregation) {
if(partialResult[1] == null && aggregation.sum != null) {
@@ -296,6 +318,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>(
this, wFrameDef) {
@@ -333,18 +359,18 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
}
private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {
- private Object previousValue;
+ private HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
private long count;
private TYPE sum;
};
@SuppressWarnings("unchecked")
public static abstract class AbstractGenericUDAFAverageEvaluator<TYPE> extends GenericUDAFEvaluator {
+ protected boolean isWindowing;
protected boolean avgDistinct;
-
// For PARTIAL1 and COMPLETE
protected transient PrimitiveObjectInspector inputOI;
- protected transient ObjectInspector copiedOI;
+ protected transient PrimitiveObjectInspector copiedOI;
// For PARTIAL2 and FINAL
private transient StructObjectInspector soi;
private transient StructField countField;
@@ -363,6 +389,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
PrimitiveObjectInspector inputOI, Object parameter);
protected abstract void doMerge(AverageAggregationBuffer<TYPE> aggregation, Long partialCount,
ObjectInspector sumFieldOI, Object partialSum);
+ protected abstract void doMergeAdd(TYPE sum, ObjectInspectorObject obj);
protected abstract void doTerminatePartial(AverageAggregationBuffer<TYPE> aggregation);
protected abstract Object doTerminate(AverageAggregationBuffer<TYPE> aggregation);
protected abstract void doReset(AverageAggregationBuffer<TYPE> aggregation) throws HiveException;
@@ -376,7 +403,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
// init input
if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
- copiedOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ copiedOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
ObjectInspectorCopyOption.JAVA);
} else {
soi = (StructObjectInspector) parameters[0];
@@ -410,6 +437,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
}
}
+ protected boolean isWindowingDistinct() {
+ return isWindowing && avgDistinct;
+ }
+
@AggregationType(estimable = true)
static class AverageAgg extends AbstractAggregationBuffer {
long count;
@@ -432,12 +463,15 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
AverageAggregationBuffer<TYPE> averageAggregation = (AverageAggregationBuffer<TYPE>) aggregation;
try {
// Skip the same value if avgDistinct is true
- if (this.avgDistinct &&
- ObjectInspectorUtils.compare(parameter, inputOI, averageAggregation.previousValue, copiedOI) == 0) {
- return;
+ if (isWindowingDistinct()) {
+ ObjectInspectorObject obj = new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, ObjectInspectorCopyOption.JAVA),
+ copiedOI);
+ if (averageAggregation.uniqueObjects.contains(obj)) {
+ return;
+ }
+ averageAggregation.uniqueObjects.add(obj);
}
- averageAggregation.previousValue = ObjectInspectorUtils.copyToStandardObject(
- parameter, inputOI, ObjectInspectorCopyOption.JAVA);
doIterate(averageAggregation, inputOI, parameter);
} catch (NumberFormatException e) {
@@ -451,6 +485,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
@Override
public Object terminatePartial(AggregationBuffer aggregation) throws HiveException {
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ }
+
doTerminatePartial((AverageAggregationBuffer<TYPE>) aggregation);
return partialResult;
}
@@ -459,9 +497,13 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
public void merge(AggregationBuffer aggregation, Object partial)
throws HiveException {
if (partial != null) {
- doMerge((AverageAggregationBuffer<TYPE>)aggregation,
- countFieldOI.get(soi.getStructFieldData(partial, countField)),
- sumFieldOI, soi.getStructFieldData(partial, sumField));
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ doMerge((AverageAggregationBuffer<TYPE>)aggregation,
+ countFieldOI.get(soi.getStructFieldData(partial, countField)),
+ sumFieldOI, soi.getStructFieldData(partial, sumField));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index 2825045..d1d0131 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.udf.generic;
+import java.util.HashSet;
+
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -25,6 +27,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -70,6 +73,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
}
GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator();
+ countEvaluator.setWindowing(paramInfo.isWindowing());
countEvaluator.setCountAllColumns(paramInfo.isAllColumns());
countEvaluator.setCountDistinct(paramInfo.isDistinct());
@@ -81,6 +85,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
*
*/
public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
+ private boolean isWindowing = false;
private boolean countAllColumns = false;
private boolean countDistinct = false;
private LongObjectInspector partialCountAggOI;
@@ -99,9 +104,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
ObjectInspectorCopyOption.JAVA);
}
result = new LongWritable(0);
+
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
+ public void setWindowing(boolean isWindowing) {
+ this.isWindowing = isWindowing;
+ }
+
private void setCountAllColumns(boolean countAllCols) {
countAllColumns = countAllCols;
}
@@ -110,10 +120,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
this.countDistinct = countDistinct;
}
+ private boolean isWindowingDistinct() {
+ return isWindowing && countDistinct;
+ }
+
/** class for storing count value. */
@AggregationType(estimable = true)
static class CountAgg extends AbstractAggregationBuffer {
- Object[] prevColumns = null; // Column values from previous row. Used to compare with current row for the case of COUNT(DISTINCT).
+ HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows
long value;
@Override
public int estimate() { return JavaDataModel.PRIMITIVES2; }
@@ -128,8 +142,8 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
@Override
public void reset(AggregationBuffer agg) throws HiveException {
- ((CountAgg) agg).prevColumns = null;
((CountAgg) agg).value = 0;
+ ((CountAgg) agg).uniqueObjects = new HashSet<ObjectInspectorObject>();
}
@Override
@@ -151,19 +165,16 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
}
}
- // Skip the counting if the values are the same for COUNT(DISTINCT) case
- if (countThisRow && countDistinct) {
- Object[] prevColumns = ((CountAgg) agg).prevColumns;
- if (prevColumns == null) {
- ((CountAgg) agg).prevColumns = new Object[parameters.length];
- } else if (ObjectInspectorUtils.compare(parameters, inputOI, prevColumns, outputOI) == 0) {
- countThisRow = false;
- }
-
- // We need to keep a copy of values from previous row.
- if (countThisRow) {
- ((CountAgg) agg).prevColumns = ObjectInspectorUtils.copyToStandardObject(
- parameters, inputOI, ObjectInspectorCopyOption.JAVA);
+ // Skip the counting if the values are the same for windowing COUNT(DISTINCT) case
+ if (countThisRow && isWindowingDistinct()) {
+ HashSet<ObjectInspectorObject> uniqueObjs = ((CountAgg) agg).uniqueObjects;
+ ObjectInspectorObject obj = new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(parameters, inputOI, ObjectInspectorCopyOption.JAVA),
+ outputOI);
+ if (!uniqueObjs.contains(obj)) {
+ uniqueObjs.add(obj);
+ } else {
+ countThisRow = false;
}
}
@@ -177,8 +188,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
- long p = partialCountAggOI.get(partial);
- ((CountAgg) agg).value += p;
+ CountAgg countAgg = (CountAgg) agg;
+
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ long p = partialCountAggOI.get(partial);
+ countAgg.value += p;
+ }
}
}
@@ -190,7 +207,11 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ return terminate(agg);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
index 6a62d7c..675d9f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
@@ -67,6 +67,13 @@ public interface GenericUDAFParameterInfo {
boolean isDistinct();
/**
+ * The flag to indicate if the UDAF invocation was from the windowing function
+ * call or not.
+ * @return <tt>true</tt> if the UDAF invocation was from the windowing function
+ * call.
+ */
+ boolean isWindowing();
+ /**
* Returns <tt>true</tt> if the UDAF invocation was done via the wildcard
* syntax <tt>FUNCTION(*)</tt>. Note that this is provided for informational
* purposes only and the function implementation is not expected to ensure
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index 7b1d6e5..f53554c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.udf.generic;
+import java.util.HashSet;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
/**
@@ -93,6 +97,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
TypeInfo[] parameters = info.getParameters();
GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) getEvaluator(parameters);
+ eval.setWindowing(info.isWindowing());
eval.setSumDistinct(info.isDistinct());
return eval;
@@ -125,44 +130,69 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
* The base type for sum operator evaluator
*
*/
- public static abstract class GenericUDAFSumEvaluator<ResultType> extends GenericUDAFEvaluator {
+ public static abstract class GenericUDAFSumEvaluator<ResultType extends Writable> extends GenericUDAFEvaluator {
static abstract class SumAgg<T> extends AbstractAggregationBuffer {
boolean empty;
T sum;
- Object previousValue = null;
+ HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
}
protected PrimitiveObjectInspector inputOI;
- protected ObjectInspector outputOI;
+ protected PrimitiveObjectInspector outputOI;
protected ResultType result;
+ protected boolean isWindowing;
protected boolean sumDistinct;
- public boolean sumDistinct() {
- return sumDistinct;
+ public void setWindowing(boolean isWindowing) {
+ this.isWindowing = isWindowing;
}
public void setSumDistinct(boolean sumDistinct) {
this.sumDistinct = sumDistinct;
}
+ protected boolean isWindowingDistinct() {
+ return isWindowing && sumDistinct;
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ return terminate(agg);
+ }
+ }
+
/**
- * Check if the input object is the same as the previous one for the case of
- * SUM(DISTINCT).
+ * Check if the input object is eligible to contribute to the sum. If it's null
+ * or the same value as the previous one for the case of SUM(DISTINCT). Then
+ * skip it.
* @param input the input object
- * @return True if sumDistinct is false or the input is different from the previous object
+ * @return True if sumDistinct is false or the non-null input is different from the previous object
*/
- protected boolean checkDistinct(SumAgg agg, Object input) {
- if (this.sumDistinct &&
- ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, outputOI) == 0) {
+ protected boolean isEligibleValue(SumAgg agg, Object input) {
+ if (input == null) {
return false;
}
- agg.previousValue = ObjectInspectorUtils.copyToStandardObject(
- input, inputOI, ObjectInspectorCopyOption.JAVA);
- return true;
- }
+ if (isWindowingDistinct()) {
+ HashSet<ObjectInspectorObject> uniqueObjs = agg.uniqueObjects;
+ ObjectInspectorObject obj = input instanceof ObjectInspectorObject ?
+ (ObjectInspectorObject)input :
+ new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(input, inputOI, ObjectInspectorCopyOption.JAVA),
+ outputOI);
+ if (!uniqueObjs.contains(obj)) {
+ uniqueObjs.add(obj);
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
}
/**
@@ -177,7 +207,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
super.init(m, parameters);
result = new HiveDecimalWritable(HiveDecimal.ZERO);
inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(inputOI,
ObjectInspectorCopyOption.JAVA);
// The output precision is 10 greater than the input which should cover at least
// 10b rows. The scale is the same as the input.
@@ -208,6 +238,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg;
bdAgg.empty = true;
bdAgg.sum = HiveDecimal.ZERO;
+ bdAgg.uniqueObjects = new HashSet<ObjectInspectorObject>();
}
boolean warned = false;
@@ -216,8 +247,10 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
assert (parameters.length == 1);
try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumHiveDecimalAgg) agg, parameters[0])) {
+ ((SumHiveDecimalAgg)agg).empty = false;
+ ((SumHiveDecimalAgg)agg).sum = ((SumHiveDecimalAgg)agg).sum.add(
+ PrimitiveObjectInspectorUtils.getHiveDecimal(parameters[0], inputOI));
}
} catch (NumberFormatException e) {
if (!warned) {
@@ -232,11 +265,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
}
@Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) agg;
@@ -245,7 +273,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
}
myagg.empty = false;
- myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+ }
}
}
@@ -261,6 +293,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (sumDistinct) {
+ return null;
+ }
+
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>(
this, wFrameDef) {
@@ -301,7 +338,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
super.init(m, parameters);
result = new DoubleWritable(0);
inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
ObjectInspectorCopyOption.JAVA);
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}
@@ -325,6 +362,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
SumDoubleAgg myagg = (SumDoubleAgg) agg;
myagg.empty = true;
myagg.sum = 0.0;
+ myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
}
boolean warned = false;
@@ -333,8 +371,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
assert (parameters.length == 1);
try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumDoubleAgg) agg, parameters[0])) {
+ ((SumDoubleAgg)agg).empty = false;
+ ((SumDoubleAgg)agg).sum += PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI);
}
} catch (NumberFormatException e) {
if (!warned) {
@@ -349,16 +388,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
}
@Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
SumDoubleAgg myagg = (SumDoubleAgg) agg;
myagg.empty = false;
- myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+ }
}
}
@@ -374,6 +412,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (sumDistinct) {
+ return null;
+ }
+
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>(this,
wFrameDef) {
@@ -415,7 +458,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
super.init(m, parameters);
result = new LongWritable(0);
inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
ObjectInspectorCopyOption.JAVA);
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
@@ -439,6 +482,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
SumLongAgg myagg = (SumLongAgg) agg;
myagg.empty = true;
myagg.sum = 0L;
+ myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
}
private boolean warned = false;
@@ -447,8 +491,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
assert (parameters.length == 1);
try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumLongAgg) agg, parameters[0])) {
+ ((SumLongAgg)agg).empty = false;
+ ((SumLongAgg)agg).sum += PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI);
}
} catch (NumberFormatException e) {
if (!warned) {
@@ -460,16 +505,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
}
@Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
SumLongAgg myagg = (SumLongAgg) agg;
- myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
myagg.empty = false;
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
+ }
}
}
@@ -485,6 +529,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }
+
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>(this,
wFrameDef) {
@@ -509,7 +558,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
return myagg.empty ? null : new Long(myagg.sum);
}
-
};
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
index 1a1b570..728964d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
@@ -29,12 +29,14 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo
{
private final ObjectInspector[] parameters;
+ private final boolean isWindowing;
private final boolean distinct;
private final boolean allColumns;
- public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean distinct,
+ public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct,
boolean allColumns) {
this.parameters = params;
+ this.isWindowing = isWindowing;
this.distinct = distinct;
this.allColumns = allColumns;
}
@@ -63,4 +65,9 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo
public boolean isAllColumns() {
return allColumns;
}
+
+ @Override
+ public boolean isWindowing() {
+ return isWindowing;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 858b47a..b89c14e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer;
import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -392,8 +393,6 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
streamingState.rollingPart.append(row);
- row = streamingState.rollingPart
- .getAt(streamingState.rollingPart.size() - 1);
WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
@@ -408,7 +407,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
}
- if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ if (fnEval != null &&
+ fnEval instanceof ISupportStreamingModeForWindowing) {
fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]);
Object out = ((ISupportStreamingModeForWindowing) fnEval)
.getNextResult(streamingState.aggBuffers[i]);
@@ -472,7 +472,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
GenericUDAFEvaluator fnEval = wFn.getWFnEval();
int numRowsRemaining = wFn.getWindowFrame().getEnd().getRelativeOffset();
- if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ if (fnEval != null &&
+ fnEval instanceof ISupportStreamingModeForWindowing) {
fnEval.terminate(streamingState.aggBuffers[i]);
WindowingFunctionInfoHelper wFnInfo = getWindowingFunctionInfoHelper(wFn.getName());
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q
index bb192a7..6b49978 100644
--- a/ql/src/test/queries/clientpositive/windowing_distinct.q
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -44,3 +44,21 @@ SELECT AVG(DISTINCT t) OVER (PARTITION BY index),
AVG(DISTINCT ts) OVER (PARTITION BY index),
AVG(DISTINCT dec) OVER (PARTITION BY index)
FROM windowing_distinct;
+
+-- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;
+
+-- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;
+
+-- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out
index 074a594..86d1cdd 100644
--- a/ql/src/test/results/clientpositive/windowing_distinct.q.out
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -128,3 +128,69 @@ POSTHOOK: Input: default@windowing_distinct
117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
+PREHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 0 0 2 1
+1 26.43 1 1 1 2
+1 96.91 1 1 0 2
+2 13.01 0 0 1 2
+2 74.72 1 1 1 2
+2 74.72 2 2 0 2
+PREHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 NULL NULL 123.34000396728516 26.43000030517578
+1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 123.34000396728516
+1 96.91 26.43000030517578 26.43000030517578 NULL 123.34000396728516
+2 13.01 NULL NULL 74.72000122070312 87.73000144958496
+2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 87.73000144958496
+2 74.72 87.73000144958496 87.73000144958496 NULL 87.73000144958496
+PREHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 NULL NULL 61.67000198364258 26.43000030517578
+1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 61.67000198364258
+1 96.91 26.43000030517578 26.43000030517578 NULL 61.67000198364258
+2 13.01 NULL NULL 74.72000122070312 43.86500072479248
+2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 43.86500072479248
+2 74.72 43.86500072479248 43.86500072479248 NULL 43.86500072479248
http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index c58e8ed..1ac72c6 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -117,6 +117,44 @@ public final class ObjectInspectorUtils {
}
/**
+ * This class can be used to wrap Hive objects and put in HashMap or HashSet.
+ * The objects will be compared using ObjectInspectors.
+ *
+ */
+ public static class ObjectInspectorObject {
+ private final Object[] objects;
+ private final ObjectInspector[] oi;
+
+ public ObjectInspectorObject(Object object, ObjectInspector oi) {
+ this.objects = new Object[] { object };
+ this.oi = new ObjectInspector[] { oi };
+ }
+
+ public ObjectInspectorObject(Object[] objects, ObjectInspector[] oi) {
+ this.objects = objects;
+ this.oi = oi;
+ }
+
+ public Object[] getValues() {
+ return objects;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || obj.getClass() != this.getClass()) { return false; }
+
+ ObjectInspectorObject comparedObject = (ObjectInspectorObject)obj;
+ return ObjectInspectorUtils.compare(objects, oi, comparedObject.objects, comparedObject.oi) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return ObjectInspectorUtils.getBucketHashCode(objects, oi);
+ }
+ }
+
+ /**
* Calculates the hash code for array of Objects that contains writables. This is used
* to work around the buggy Hadoop DoubleWritable hashCode implementation. This should
* only be used for process-local hash codes; don't replace stored hash codes like bucketing.