You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/15 09:00:50 UTC

[GitHub] [spark] nyingping opened a new pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

nyingping opened a new pull request #35526:
URL: https://github.com/apache/spark/pull/35526


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   At present, the sliding window adopts the form of expand + filter, but in some cases, filter is not necessary.
   
   Filtering is required if the sliding window is irregular. When the window length is divided by the slide length the result is an integer (I believe this is also the case for most work scenarios in practice for sliding window), there is no need to filter, which can save calculation resources and improve performance.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   save calculation resources and improve performance.
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   NO
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   UT and benchmark.
   
   simple benchmark in this [commit ](https://github.com/nyingping/spark/commit/cccc742f601cffca99ab602165c024b3523ebc72),thanks [HeartSaVioR@d532b6f](https://github.com/HeartSaVioR/spark/commit/d532b6f6bcdd80cdaac520b21587ebb69ff2df8f)
   
   > spark.range(numOfRow)
   >       .selectExpr("CAST(id AS timestamp) AS time")
   >       .select(window(col("time"), "15 seconds", "3 seconds", "2 seconds"))
   >       .count()
   
   Result:
   
   ```
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_291-b10 on Windows 10 10.0
   AMD64 Family 23 Model 96 Stepping 1, AuthenticAMD
   sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ------------------------------------------------------------------------------------------------------------------------
   old logic                                           799            866          70         12.5          79.9       1.0X
   new logic                                            58             68           9        171.2           5.8      13.7X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807405322



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3927,8 +3927,12 @@ object TimeWindowing extends Rule[LogicalPlan] {
           val projections = windows.map(_ +: child.output)
 
           val filterExpr =
-            window.timeColumn >= windowAttr.getField(WINDOW_START) &&
-              window.timeColumn < windowAttr.getField(WINDOW_END)
+            if (window.windowDuration % window.slideDuration == 0) {

Review comment:
       Probably good to leave code comment like below:
   
   > When the condition `windowDuration % slideDuration = 0` is fulfilled, the estimation of the number of windows becomes exact one, which means all produced windows are valid.
   
   I'm not a native so the sentence may not be perfect, but may be acceptable and understandable.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,46 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("SPARK-38214: No need to filter data when the sliding window length is not redundant") {

Review comment:
       It would be nice if we are clear about what we want to test.
   
   If we want to verify that the change still produces right windows, I'd rather verify the boundary of time range explicitly, even it is much verbose.
   
   If we have such test in existing tests, and want to ensure we don't inject comparison expression on calculation of time window, we probably need to look into logical plan (especially Filter) and verify the expression used in Filter.
   
   If we have such test in existing tests and you feel uneasy to verify the expression in Filter node, it's OK to skip adding the test, since it means the functionality is validated with existing tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808699368



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -521,7 +521,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
     Seq(df1, df2, df3, df4).foreach { df =>
       val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
       assert(filter.isDefined)
-      val exist = filter.get.constraints.filter(e =>
+      val exist = filter.get.constraints.filter( e =>

Review comment:
       nit: Previous was correct based on style guide. please refer below guidance:
   https://github.com/databricks/scala-style-guide#anonymous-methods
   
   When you use parentheses for anonymous method, no space. When you use curly braces for anonymous method, space.
   
   For multi-lines we prefer using curly braces, like
   
   ```
   val exist = filter.get.constraints.filter { e =>
     e.toString.contains(">=") || e.toString.contains("<")
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808602851



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>

Review comment:
       I got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808624489



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter windows when windowDuration is multiple of slideDuration") {
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2, df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      assert(filter.isDefined)
+      val exist = filter.get.constraints.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter windows " +
+        "when windowDuration is multiple of slideDuration")
+    }
+  }
+

Review comment:
       nit: unnecessary empty line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808588658



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I agree that "`redundant`" is a poor description, How about "`No need to filter data when the windowDuration of a sliding window is an integer multiple of slideDuration`" instead? If this is accurate enough, I can use it to replace the name of the test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808615040



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    // check produces right windows

Review comment:
       I think the test case called "millisecond precision sliding windows" has covered this situation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808701877



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -521,7 +521,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
     Seq(df1, df2, df3, df4).foreach { df =>
       val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
       assert(filter.isDefined)
-      val exist = filter.get.constraints.filter(e =>
+      val exist = filter.get.constraints.filter( e =>

Review comment:
       Has been changed back. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808626843



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter windows when windowDuration is multiple of slideDuration") {
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2, df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      assert(filter.isDefined)
+      val exist = filter.get.constraints.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter windows " +
+        "when windowDuration is multiple of slideDuration")
+    }
+  }
+

Review comment:
       got it.sorry for my careless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807907460



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>

Review comment:
       1. Please add `assert(filter.isDefined)` in above line.
   2. You may be able to directly call `constraints.filter { e => ... ` instead of adding `.iterator.toStream`.
   

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>

Review comment:
       I guess we can test df1, df2, df3, df4 here instead of having two separate code blocks.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(

Review comment:
       We don't need to validate the results here since other tests also do this.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>

Review comment:
       This can be removed if we test against df1 to df4 in above.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    // check produces right windows

Review comment:
       We don't need to validate the results here since other tests also do this. If you feel we don't have matching tests, please add separated general test.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       `when the sliding window length is not redundant` Honestly I don't get what this means. Could you please elaborate about the the meaning of `redundant` here, or could we use mathematical expression like `multiple of` or `a factor of`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808594997



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    // check produces right windows

Review comment:
       Most of the existing tests for sliding windows are under the condition of "`windowduration % slideduration ! = 0`". The only exception is the test case called "`millisecond precision sliding windows`", but it focuses on "`millisecond precision`". So I'm not sure if I need to add new case to verify that the change still produces right windows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808588658



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I agree that "`redundant`" is a poor description, How about "`No need to filter data when the windowDuration of a sliding window is an integer multiple of slideDuration`" instead? If this is accurate enough, I can use it to replace the name of the test case and the assert message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1040149788


   I'll check it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1043700549


   OK, no feedback on working hour in US timezone.
   
   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1041134095


   Sorry,it's my fault.I mixed the update history of the branch of the previous with the present, caused interference and misunderstanding.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808609616



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    // check produces right windows

Review comment:
       Please add the case of `windowDuration % slideDuration == 0` in one of existing tests if you feel we missed the case. Let's focus to verify the actual change in new test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808590579



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>

Review comment:
       I got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808610496



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I got it.thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808610496



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I got it.thans!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808588658



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I agree that "`redundant`" is a poor description, How about "`No need to filter data when the windowDuration of a sliding window is an integer multiple of slideDuration`"  or `"No need to filter data when windowDuration is multiple of slideDuration for sliding windows"` instead? If this is accurate enough, I can use it to replace the name of the test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808629052



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter windows when windowDuration is multiple of slideDuration") {
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2, df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      assert(filter.isDefined)
+      val exist = filter.get.constraints.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter windows " +
+        "when windowDuration is multiple of slideDuration")
+    }
+  }
+

Review comment:
       No worries. nit is really just a nit anyone can miss.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #35526:
URL: https://github.com/apache/spark/pull/35526


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807504401



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3927,8 +3927,12 @@ object TimeWindowing extends Rule[LogicalPlan] {
           val projections = windows.map(_ +: child.output)
 
           val filterExpr =
-            window.timeColumn >= windowAttr.getField(WINDOW_START) &&
-              window.timeColumn < windowAttr.getField(WINDOW_END)
+            if (window.windowDuration % window.slideDuration == 0) {

Review comment:
       @HeartSaVioR  thanks!
   
   I've added code comments as you suggested. I think your suggestion is good enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808608809



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       `No need to filter windows when windowDuration is multiple of slideDuration` looks OK to me. tumbling window means windowDuration = slideDuration so we don't need to explicitly say `sliding windows`. Please change the title of PR as well. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807402650



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,46 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("SPARK-38214: No need to filter data when the sliding window length is not redundant") {

Review comment:
       It would be nice if we are clear about what we want to test.
   
   If we want to verify that the change still produces right windows, I'd rather verify the boundary of time range explicitly instead of just checking the value column, even it is much verbose. The test would be general one so doesn't need to have JIRA ticket number and the name of the test should be also general.
   
   If we have such test in existing tests, and want to ensure we don't inject comparison expression on calculation of time window, we probably need to look into logical plan (especially Filter) and verify the expression used in Filter.
   
   If we have such test in existing tests and you feel uneasy to verify the expression in Filter node, it's OK to skip adding the test, since it means the functionality is validated with existing tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807402650



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,46 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("SPARK-38214: No need to filter data when the sliding window length is not redundant") {

Review comment:
       It would be nice if we are clear about what we want to test.
   
   If we want to verify that the change still produces right windows, I'd rather verify the boundary of time range explicitly instead of just checking the value column, even it is much verbose.
   
   If we have such test in existing tests, and want to ensure we don't inject comparison expression on calculation of time window, we probably need to look into logical plan (especially Filter) and verify the expression used in Filter.
   
   If we have such test in existing tests and you feel uneasy to verify the expression in Filter node, it's OK to skip adding the test, since it means the functionality is validated with existing tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808588658



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")

Review comment:
       I agree that "`redundant`" is a poor description, How about "`No need to filter data when the windowDuration of a sliding window is an integer multiple of slideDuration`"  or `"No need to filter data when when windowDuration is multiple of slideDuration for sliding windows"` instead? If this is accurate enough, I can use it to replace the name of the test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
nyingping commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1043704339


   @HeartSaVioR Thank you for review very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r807503946



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,46 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("SPARK-38214: No need to filter data when the sliding window length is not redundant") {

Review comment:
       @HeartSaVioR thank you for review!
   
   I have updated the test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1040149788


   I'll check it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1041132771


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nyingping commented on a change in pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
nyingping commented on a change in pull request #35526:
URL: https://github.com/apache/spark/pull/35526#discussion_r808594997



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
##########
@@ -490,4 +490,81 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       assert(attributeReference.dataType == tuple._2)
     }
   }
+
+  test("No need to filter data when the sliding window length is not redundant") {
+    // check the value column
+    val df1 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df2 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    val df3 = Seq(
+      ("2022-02-15 19:39:34", 1, "a"),
+      ("2022-02-15 19:39:56", 2, "a"),
+      ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+    val df4 = Seq(
+      (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
+      (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
+      .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
+      .orderBy($"window.start".asc, $"value".desc).select("value")
+
+    Seq(df1, df2).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    Seq(df3, df4).foreach { df =>
+      val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
+      val exist = filter.get.constraints.iterator.toStream.filter(e =>
+        e.toString.contains(">=") || e.toString.contains("<"))
+      assert(exist.isEmpty, "No need to filter data between " +
+        "window.start and window.end when the sliding window length is not redundant")
+
+      checkAnswer(
+        df,
+        Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2))
+      )
+    }
+
+    // check produces right windows

Review comment:
       Most of the existing tests for sliding windows are under the condition of "`windowduration % slideduration != 0`". The only exception is the test case called "`millisecond precision sliding windows`", but it focuses on "`millisecond precision`". So I'm not sure if I need to add new case to verify that the change still produces right windows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1042606841


   I'll leave this in a day to see the chance of another reviews from others. I'll merge this tomorrow if there's no new feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35526: [SPARK-38214][SS]No need to filter windows when windowDuration is multiple of slideDuration

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1043702003


   Thanks @nyingping for the contribution! I merged into master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1041009520


   Yeah I meant additional optimization along with previous one. Sorry if I confused you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35526: [SPARK-38214][SS]No need to filter data when the sliding window length is not redundant

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35526:
URL: https://github.com/apache/spark/pull/35526#issuecomment-1041004770


   Is this a follow-up of https://github.com/apache/spark/pull/35362? Looks like a different one. But seems okay. Will re-check it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org