You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/07/17 16:25:25 UTC
spark git commit: [SPARK-21590][SS] Window start time should support
negative values
Repository: spark
Updated Branches:
refs/heads/master 5215344de -> 7688ce88b
[SPARK-21590][SS] Window start time should support negative values
## What changes were proposed in this pull request?
Remove the non-negative checks of window start time to make window support negative start time, and add a check to guarantee the absolute value of start time is less than slide duration.
## How was this patch tested?
New unit tests.
Author: HanShuliang <ke...@gmail.com>
Closes #18903 from KevinZwx/dev.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7688ce88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7688ce88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7688ce88
Branch: refs/heads/master
Commit: 7688ce88b2ea514054200845ae860fbccc25a927
Parents: 5215344
Author: HanShuliang <ke...@gmail.com>
Authored: Tue Jul 17 11:25:23 2018 -0500
Committer: Sean Owen <sr...@gmail.com>
Committed: Tue Jul 17 11:25:23 2018 -0500
----------------------------------------------------------------------
.../sql/catalyst/expressions/TimeWindow.scala | 9 ++--
.../catalyst/analysis/AnalysisErrorSuite.scala | 25 +++++++----
.../catalyst/expressions/TimeWindowSuite.scala | 13 ++++++
.../spark/sql/DataFrameTimeWindowingSuite.scala | 45 ++++++++++++++++++++
4 files changed, 77 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 84e38a8..8e48856 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -80,16 +80,13 @@ case class TimeWindow(
if (slideDuration <= 0) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
}
- if (startTime < 0) {
- return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.")
- }
if (slideDuration > windowDuration) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
s" to the windowDuration ($windowDuration).")
}
- if (startTime >= slideDuration) {
- return TypeCheckFailure(s"The start time ($startTime) must be less than the " +
- s"slideDuration ($slideDuration).")
+ if (startTime.abs >= slideDuration) {
+ return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
+ s"than the slideDuration ($slideDuration).")
}
}
dataTypeCheck
http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 5d2f8e7..0ce94d3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -334,14 +334,28 @@ class AnalysisErrorSuite extends AnalysisTest {
"start time greater than slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")),
- "The start time " :: " must be less than the slideDuration " :: Nil
+ "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)
errorTest(
"start time equal to slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")),
- "The start time " :: " must be less than the slideDuration " :: Nil
+ "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
+ )
+
+ errorTest(
+ "SPARK-21590: absolute value of start time greater than slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 minute").as("window")),
+ "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
+ )
+
+ errorTest(
+ "SPARK-21590: absolute value of start time equal to slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 second").as("window")),
+ "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)
errorTest(
@@ -373,13 +387,6 @@ class AnalysisErrorSuite extends AnalysisTest {
)
errorTest(
- "negative start time in time window",
- testRelation.select(
- TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")),
- "The start time" :: "must be greater than or equal to 0." :: Nil
- )
-
- errorTest(
"generator nested in expressions",
listRelation.select(Explode('list) + 1),
"Generators are not supported when it's nested in expressions, but got: (explode(list) + 1)"
http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
index 351d4d0..d46135c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -77,6 +77,19 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva
}
}
+ test("SPARK-21590: Start time works with negative values and return microseconds") {
+ val validDuration = "10 minutes"
+ for ((text, seconds) <- Seq(
+ ("-10 seconds", -10000000), // -1e7
+ ("-1 minute", -60000000),
+ ("-1 hour", -3600000000L))) { // -6e7
+ assert(TimeWindow(Literal(10L), validDuration, validDuration, "interval " + text).startTime
+ === seconds)
+ assert(TimeWindow(Literal(10L), validDuration, validDuration, text).startTime
+ === seconds)
+ }
+ }
+
private val parseExpression = PrivateMethod[Long]('parseExpression)
test("parse sql expression for duration in microseconds - string") {
http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index 6fe3568..2953425 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -43,6 +43,22 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
)
}
+ test("SPARK-21590: tumbling window using negative start time") {
+ val df = Seq(
+ ("2016-03-27 19:39:30", 1, "a"),
+ ("2016-03-27 19:39:25", 2, "a")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"))
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
+ Seq(
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 2)
+ )
+ )
+ }
+
test("tumbling window groupBy statement") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
@@ -72,6 +88,20 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
Seq(Row(1), Row(1), Row(1)))
}
+ test("SPARK-21590: tumbling window groupBy statement with negative startTime") {
+ val df = Seq(
+ ("2016-03-27 19:39:34", 1, "a"),
+ ("2016-03-27 19:39:56", 2, "a"),
+ ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+ checkAnswer(
+ df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"), $"id")
+ .agg(count("*").as("counts"))
+ .orderBy($"window.start".asc)
+ .select("counts"),
+ Seq(Row(1), Row(1), Row(1)))
+ }
+
test("tumbling window with multi-column projection") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
@@ -309,4 +339,19 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
)
}
}
+
+ test("SPARK-21590: time window in SQL with three expressions including negative start time") {
+ withTempTable { table =>
+ checkAnswer(
+ spark.sql(
+ s"""select window(time, "10 seconds", 10000000, "-5 seconds"), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
+ Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
+ )
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org