You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/02/01 19:15:15 UTC
[spark] branch master updated: [SPARK-26806][SS]
EventTimeStats.merge should handle zeros correctly
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 03a928c [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
03a928c is described below
commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Fri Feb 1 11:15:05 2019 -0800
[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
## What changes were proposed in this pull request?
Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report:
```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
}
```
This issue was reported by liancheng .
This PR fixes the above issue.
## How was this patch tested?
The new unit tests.
Closes #23718 from zsxwing/merge-zero.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../streaming/EventTimeWatermarkExec.scala | 17 +++++++++---
.../sql/streaming/EventTimeWatermarkSuite.scala | 32 ++++++++++++++++++++--
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou
}
def merge(that: EventTimeStats): Unit = {
- this.max = math.max(this.max, that.max)
- this.min = math.min(this.min, that.min)
- this.count += that.count
- this.avg += (that.avg - this.avg) * that.count / this.count
+ if (that.count == 0) {
+ // no-op
+ } else if (this.count == 0) {
+ this.max = that.max
+ this.min = that.min
+ this.count = that.count
+ this.avg = that.avg
+ } else {
+ this.max = math.max(this.max, that.max)
+ this.min = math.min(this.min, that.min)
+ this.count += that.count
+ this.avg += (that.avg - this.avg) * that.count / this.count
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c696204..b79770a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
sqlContext.streams.active.foreach(_.stop())
}
- test("EventTimeStats") {
- val epsilon = 10E-6
+ private val epsilon = 10E-6
+ test("EventTimeStats") {
val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
stats.add(80L)
stats.max should be (100)
@@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}
test("EventTimeStats: avg on large values") {
- val epsilon = 10E-6
val largeValue = 10000000000L // 10B
// Make sure `largeValue` will cause overflow if we use a Long sum to calc avg.
assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
stats.avg should be ((largeValue + 0.5) +- epsilon)
}
+ test("EventTimeStats: zero merge zero") {
+ val stats = EventTimeStats.zero
+ val stats2 = EventTimeStats.zero
+ stats.merge(stats2)
+ stats should be (EventTimeStats.zero)
+ }
+
+ test("EventTimeStats: non-zero merge zero") {
+ val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+ val stats2 = EventTimeStats.zero
+ stats.merge(stats2)
+ stats.max should be (10L)
+ stats.min should be (1L)
+ stats.avg should be (5.0 +- epsilon)
+ stats.count should be (3L)
+ }
+
+ test("EventTimeStats: zero merge non-zero") {
+ val stats = EventTimeStats.zero
+ val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+ stats.merge(stats2)
+ stats.max should be (10L)
+ stats.min should be (1L)
+ stats.avg should be (5.0 +- epsilon)
+ stats.count should be (3L)
+ }
+
test("error on bad column") {
val inputData = MemoryStream[Int].toDF()
val e = intercept[AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org