You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2015/12/08 02:28:14 UTC
hive git commit: HIVE-12574: windowing function returns incorrect
result when the window size is larger than the partition size (reviewed by
Jimmy Xiang)
Repository: hive
Updated Branches:
refs/heads/master ee63dcafe -> a7927b1b8
HIVE-12574: windowing function returns incorrect result when the window size is larger than the partition size (reviewed by Jimmy Xiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7927b1b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7927b1b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7927b1b
Branch: refs/heads/master
Commit: a7927b1b87e41745d34238054f5e243c7dd95d17
Parents: ee63dca
Author: Aihua Xu <ai...@apache.org>
Authored: Wed Dec 2 18:04:46 2015 -0500
Committer: Aihua Xu <ai...@apache.org>
Committed: Mon Dec 7 20:27:22 2015 -0500
----------------------------------------------------------------------
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +-
.../ql/udf/generic/GenericUDAFFirstValue.java | 12 ++--
.../ql/udf/generic/GenericUDAFLastValue.java | 9 ++-
.../hive/ql/udf/generic/GenericUDAFMax.java | 14 +++--
.../generic/GenericUDAFStreamingEvaluator.java | 28 ++++++----
.../hive/ql/udf/ptf/WindowingTableFunction.java | 18 +++---
.../clientpositive/windowing_windowspec4.q | 19 +++++++
.../clientpositive/windowing_windowspec4.q.out | 59 ++++++++++++++++++++
8 files changed, 129 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dca3081..5803a9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11771,9 +11771,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
else
{
int amt = Integer.parseInt(amtNode.getText());
- if ( amt < 0 ) {
+ if ( amt <= 0 ) {
throw new SemanticException(
- "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt);
+ "Window Frame Boundary Amount must be a positive integer, provided amount is: " + amt);
}
bs.setAmt(amt);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
index 160ce91..7ad4eb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
@@ -262,7 +262,7 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver {
}
}
- if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) {
+ if (s.hasResultReady()) {
/*
* if skipNulls is true and there are no rows in valueChain => all rows
* in partition are null so far; so add null in o/p
@@ -293,12 +293,14 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver {
// For the case: X following and Y following, process first Y-X results and then insert X nulls.
// For the case X preceding and Y following, process Y results.
for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
- s.results.add(r == null ? null : r.val);
+ if (s.hasResultReady()) {
+ s.results.add(r == null ? null : r.val);
+ }
s.numRows++;
if (r != null) {
int fIdx = (Integer) r.idx;
if (!wFrameDef.isStartUnbounded()
- && s.numRows + i >= fIdx + wFrameDef.getWindowSize()
+ && s.numRows >= fIdx + wFrameDef.getWindowSize()
&& !s.valueChain.isEmpty()) {
s.valueChain.removeFirst();
r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r;
@@ -307,7 +309,9 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver {
}
for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
- s.results.add(null);
+ if (s.hasResultReady()) {
+ s.results.add(null);
+ }
s.numRows++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
index f917621..4989a0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
@@ -251,10 +251,15 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver {
// For the case: X following and Y following, process first Y-X results and then insert X nulls.
// For the case X preceding and Y following, process Y results.
for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
- s.results.add(s.lastValue);
+ if (s.hasResultReady()) {
+ s.results.add(s.lastValue);
+ }
+ s.numRows++;
}
for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
- s.results.add(null);
+ if (s.hasResultReady()) {
+ s.results.add(null);
+ }
s.numRows++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
index 98abd5c..43b23fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
@@ -251,7 +251,7 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver {
s.maxChain.addLast(new Object[] { o, s.numRows });
}
- if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) {
+ if (s.hasResultReady()) {
s.results.add(s.maxChain.getFirst()[0]);
}
s.numRows++;
@@ -287,20 +287,24 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver {
// For the case: X following and Y following, process first Y-X results and then insert X nulls.
// For the case X preceding and Y following, process Y results.
for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
- s.results.add(r == null ? null : r[0]);
+ if (s.hasResultReady()) {
+ s.results.add(r == null ? null : r[0]);
+ }
s.numRows++;
if (r != null) {
int fIdx = (Integer) r[1];
if (!wFrameDef.isStartUnbounded()
- && s.numRows + i >= fIdx + wFrameDef.getWindowSize()
+ && s.numRows >= fIdx + wFrameDef.getWindowSize()
&& !s.maxChain.isEmpty()) {
s.maxChain.removeFirst();
- r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r;
+ r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : null;
}
}
}
for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
- s.results.add(null);
+ if (s.hasResultReady()) {
+ s.results.add(null);
+ }
s.numRows++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
index 3c76404..d2e1b26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
@@ -56,6 +56,16 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends
results.clear();
numRows = 0;
}
+
+ /**
+ * For the cases "X preceding and Y preceding" or the number of processed rows
+ * is more than the size of FOLLOWING window, we are able to generate a PTF result
+ * for a previous row.
+ * @return
+ */
+ public boolean hasResultReady() {
+ return this.numRows >= wFrameDef.getEnd().getRelativeOffset();
+ }
}
@Override
@@ -141,16 +151,6 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends
}
/**
- * For the cases "X preceding and Y preceding" or the number of processed rows
- * is more than the size of FOLLOWING window, we are able to generate a PTF result
- * for a previous row.
- * @return
- */
- public boolean hasResultReady() {
- return this.numRows >= wFrameDef.getEnd().getRelativeOffset();
- }
-
- /**
* Retrieve the next stored intermediate result, i.e.,
* Get S[x-1] in the computation of S[x..y] = S[y] - S[x-1].
*/
@@ -206,11 +206,15 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends
// For the case: X following and Y following, process first Y-X results and then insert X nulls.
// For the case X preceding and Y following, process Y results.
for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
- ss.results.add(getNextResult(ss));
+ if (ss.hasResultReady()) {
+ ss.results.add(getNextResult(ss));
+ }
ss.numRows++;
}
for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
- ss.results.add(null);
+ if (ss.hasResultReady()) {
+ ss.results.add(null);
+ }
ss.numRows++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 9d21103..2c076f50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -496,18 +496,18 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
} else {
while (numRowsRemaining > 0) {
- int rowToProcess = streamingState.rollingPart.size()
- - numRowsRemaining;
- Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
- streamingState.order);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- Object out = evaluateWindowFunction(wFn, rItr);
- streamingState.fnOutputs[i].add(out);
+ int rowToProcess = streamingState.rollingPart.size() - numRowsRemaining;
+ if (rowToProcess >= 0) {
+ Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+ streamingState.order);
+ PTFPartitionIterator<Object> rItr = rng.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+ Object out = evaluateWindowFunction(wFn, rItr);
+ streamingState.fnOutputs[i].add(out);
+ }
numRowsRemaining--;
}
}
-
}
List<Object> oRows = new ArrayList<Object>();
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/test/queries/clientpositive/windowing_windowspec4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_windowspec4.q b/ql/src/test/queries/clientpositive/windowing_windowspec4.q
new file mode 100644
index 0000000..fcf0f25
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/windowing_windowspec4.q
@@ -0,0 +1,19 @@
+--Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing;
+
+create table smalltable_windowing(
+ i int,
+ type string);
+insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a');
+
+select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 following)
+from smalltable_windowing;
http://git-wip-us.apache.org/repos/asf/hive/blob/a7927b1b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_windowspec4.q.out b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
new file mode 100644
index 0000000..60e20ef
--- /dev/null
+++ b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
@@ -0,0 +1,59 @@
+PREHOOK: query: --Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: --Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table smalltable_windowing(
+ i int,
+ type string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@smalltable_windowing
+POSTHOOK: query: create table smalltable_windowing(
+ i int,
+ type string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@smalltable_windowing
+PREHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@smalltable_windowing
+POSTHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@smalltable_windowing
+POSTHOOK: Lineage: smalltable_windowing.i EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: smalltable_windowing.type SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 following)
+from smalltable_windowing
+PREHOOK: type: QUERY
+PREHOOK: Input: default@smalltable_windowing
+#### A masked pattern was here ####
+POSTHOOK: query: select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 following)
+from smalltable_windowing
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@smalltable_windowing
+#### A masked pattern was here ####
+a 1 3 1 1 3 2.0 6 [1,2,3] 3
+a 2 3 1 1 3 2.0 6 [1,2,3] 3
+a 3 3 2 2 3 2.5 5 [2,3] 2