You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/07/17 16:25:25 UTC

spark git commit: [SPARK-21590][SS] Window start time should support negative values

Repository: spark
Updated Branches:
  refs/heads/master 5215344de -> 7688ce88b


[SPARK-21590][SS] Window start time should support negative values

## What changes were proposed in this pull request?

Remove the non-negative checks of window start time to make window support negative start time, and add a check to guarantee the absolute value of start time is less than slide duration.

## How was this patch tested?

New unit tests.

Author: HanShuliang <ke...@gmail.com>

Closes #18903 from KevinZwx/dev.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7688ce88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7688ce88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7688ce88

Branch: refs/heads/master
Commit: 7688ce88b2ea514054200845ae860fbccc25a927
Parents: 5215344
Author: HanShuliang <ke...@gmail.com>
Authored: Tue Jul 17 11:25:23 2018 -0500
Committer: Sean Owen <sr...@gmail.com>
Committed: Tue Jul 17 11:25:23 2018 -0500

----------------------------------------------------------------------
 .../sql/catalyst/expressions/TimeWindow.scala   |  9 ++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 25 +++++++----
 .../catalyst/expressions/TimeWindowSuite.scala  | 13 ++++++
 .../spark/sql/DataFrameTimeWindowingSuite.scala | 45 ++++++++++++++++++++
 4 files changed, 77 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 84e38a8..8e48856 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -80,16 +80,13 @@ case class TimeWindow(
       if (slideDuration <= 0) {
         return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
       }
-      if (startTime < 0) {
-        return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.")
-      }
       if (slideDuration > windowDuration) {
         return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
           s" to the windowDuration ($windowDuration).")
       }
-      if (startTime >= slideDuration) {
-        return TypeCheckFailure(s"The start time ($startTime) must be less than the " +
-          s"slideDuration ($slideDuration).")
+      if (startTime.abs >= slideDuration) {
+        return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
+          s"than the slideDuration ($slideDuration).")
       }
     }
     dataTypeCheck

http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 5d2f8e7..0ce94d3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -334,14 +334,28 @@ class AnalysisErrorSuite extends AnalysisTest {
     "start time greater than slide duration in time window",
     testRelation.select(
       TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")),
-      "The start time " :: " must be less than the slideDuration " :: Nil
+      "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
   )
 
   errorTest(
     "start time equal to slide duration in time window",
     testRelation.select(
       TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")),
-      "The start time " :: " must be less than the slideDuration " :: Nil
+      "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
+  )
+
+  errorTest(
+    "SPARK-21590: absolute value of start time greater than slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 minute").as("window")),
+    "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
+  )
+
+  errorTest(
+    "SPARK-21590: absolute value of start time equal to slide duration in time window",
+    testRelation.select(
+      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 second").as("window")),
+    "The absolute value of start time " :: " must be less than the slideDuration " :: Nil
   )
 
   errorTest(
@@ -373,13 +387,6 @@ class AnalysisErrorSuite extends AnalysisTest {
   )
 
   errorTest(
-    "negative start time in time window",
-    testRelation.select(
-      TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")),
-      "The start time" :: "must be greater than or equal to 0." :: Nil
-  )
-
-  errorTest(
     "generator nested in expressions",
     listRelation.select(Explode('list) + 1),
     "Generators are not supported when it's nested in expressions, but got: (explode(list) + 1)"

http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
index 351d4d0..d46135c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -77,6 +77,19 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva
     }
   }
 
+  test("SPARK-21590: Start time works with negative values and return microseconds") {
+    val validDuration = "10 minutes"
+    for ((text, seconds) <- Seq(
+      ("-10 seconds", -10000000), // -1e7
+      ("-1 minute", -60000000),
+      ("-1 hour", -3600000000L))) { // -6e7
+      assert(TimeWindow(Literal(10L), validDuration, validDuration, "interval " + text).startTime
+        === seconds)
+      assert(TimeWindow(Literal(10L), validDuration, validDuration, text).startTime
+        === seconds)
+    }
+  }
+
   private val parseExpression = PrivateMethod[Long]('parseExpression)
 
   test("parse sql expression for duration in microseconds - string") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7688ce88/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index 6fe3568..2953425 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -43,6 +43,22 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
     )
   }
 
+  test("SPARK-21590: tumbling window using negative start time") {
+    val df = Seq(
+      ("2016-03-27 19:39:30", 1, "a"),
+      ("2016-03-27 19:39:25", 2, "a")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"))
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
+      Seq(
+        Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 2)
+      )
+    )
+  }
+
   test("tumbling window groupBy statement") {
     val df = Seq(
       ("2016-03-27 19:39:34", 1, "a"),
@@ -72,6 +88,20 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
       Seq(Row(1), Row(1), Row(1)))
   }
 
+  test("SPARK-21590: tumbling window groupBy statement with negative startTime") {
+    val df = Seq(
+      ("2016-03-27 19:39:34", 1, "a"),
+      ("2016-03-27 19:39:56", 2, "a"),
+      ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
+
+    checkAnswer(
+      df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"), $"id")
+        .agg(count("*").as("counts"))
+        .orderBy($"window.start".asc)
+        .select("counts"),
+      Seq(Row(1), Row(1), Row(1)))
+  }
+
   test("tumbling window with multi-column projection") {
     val df = Seq(
         ("2016-03-27 19:39:34", 1, "a"),
@@ -309,4 +339,19 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
       )
     }
   }
+
+  test("SPARK-21590: time window in SQL with three expressions including negative start time") {
+    withTempTable { table =>
+      checkAnswer(
+        spark.sql(
+          s"""select window(time, "10 seconds", 10000000, "-5 seconds"), value from $table""")
+          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+        Seq(
+          Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
+          Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
+          Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
+        )
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org