You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/17 21:40:29 UTC

[GitHub] [spark] alex-balikov opened a new pull request, #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

alex-balikov opened a new pull request, #38288:
URL: https://github.com/apache/spark/pull/38288

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating operators. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Added new unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001254220


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,52 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(

Review Comment:
   Oh, we should also add this manually into `python/docs/source/reference/pyspark.sql/functions.rst`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002782103


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")

Review Comment:
   cc. @MaxGekk friendly reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
         .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows. If we fix the rule to allow multiple window_time calls with different windows, it should just work.
   
   Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities. 
   
   That said, Spark can handle the cartesian product of window"s". The unsupported operation checker is more restrict than what Spark can actually do.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
         .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows. If we fix the rule to allow multiple window_time calls with different windows, it should just work.
   
   Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities. 
   
   That said, Spark can handle the cartesian product of window"s". The unsupported operation checker is more restricted than what Spark can actually do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999152665


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and

Review Comment:
   I would use a DDL-formatted string (or other standard format if you prefer).
   
   ```suggestion
       `STRUCT<start: TIMESTAMP, end: TIMESTAMP>` where start is inclusive and
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r998918294


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {

Review Comment:
   Maybe good to move out rules for time window and session window as well while we are here.
   
   ps. It might be even better to consider moving out other rules as well (need help from experts on SQL area) so that no one would think it's proper to add a new rule in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001079639


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3777,6 +3777,23 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+  /**
+   * Extracts the event time from the window column of a record produced by window aggregation

Review Comment:
   reworded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r998905941


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   so one operator can only host one window time expression?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rxin commented on pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
rxin commented on PR #38288:
URL: https://github.com/apache/spark/pull/38288#issuecomment-1364413004

   Folks - don't we need to do pruning? The ResolveWindowTime is super expensive right now and will be applied to every node repeatedly until fix point.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r998806180


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")

Review Comment:
   Let's put some information of column/expression in the error message. It's not friendly to users to react based on the error message.
   
   Btw I don't have a good suggestion for error framework - we can ping some folks to get some guidance cc. @MaxGekk 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.

Review Comment:
   nit: Maybe "further aggregations" is too narrow explanation compared to what it can enable, if we consider deduplication as not an aggregation. Maybe "the correct event time of window", or "the correct representative time of window"?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	2
+        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	2021-01-01 00:09:59.999999	1
+        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  // FIXME: Should we check that windowColumn is a correct one - generated by

Review Comment:
   I think we are fine as long as the rule checks the marker.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .remove(TimeWindow.marker)
+          .remove(SessionWindow.marker)
+          .build()
+
+        val attr = AttributeReference(
+          "window_time", windowTime.dataType, metadata = newMetadata)()
+
+        // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
+        // it is, it is going to be bound to the different window even if we apply the same window
+        // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
+        // correct window range.
+        val subtractExpr =
+        PreciseTimestampConversion(
+          Subtract(PreciseTimestampConversion(
+            // FIXME: better handling of window.end
+            GetStructField(windowTime.windowColumn, 1),
+            windowTime.dataType, LongType), Literal(1L)),
+          LongType,
+          windowTime.dataType)
+
+        // FIXME: Can there already be a window_time column? Will this lead to conflict?

Review Comment:
   This is valid concern, although we do the same for window() as well.
   
   E.g. Spark tends to name the column for function as the function name with entire parameters. (Unless we alias the result of function explicitly, it should be something like `window(event_time, '5 seconds')`, but we just give the column name of the result as `window`.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .remove(TimeWindow.marker)
+          .remove(SessionWindow.marker)
+          .build()
+
+        val attr = AttributeReference(
+          "window_time", windowTime.dataType, metadata = newMetadata)()
+
+        // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
+        // it is, it is going to be bound to the different window even if we apply the same window
+        // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
+        // correct window range.
+        val subtractExpr =
+        PreciseTimestampConversion(
+          Subtract(PreciseTimestampConversion(
+            // FIXME: better handling of window.end

Review Comment:
   I guess this is OK as long as we don't make a change in window struct. (And technically we can't, due to the backward compatibility.)



##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3777,6 +3777,23 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+  /**
+   * Extracts the event time from the window column of a record produced by window aggregation

Review Comment:
   I may need to be strict here to avoid further confusion based on comment. We do not have window aggregation operator. Technically saying, window function is effectively a TVF like explode(), and we apply the "normal" aggregation against the grouped Dataset which group contains window.
   
   That said, `produced by window aggregation` seems to be unnecessary.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	2
+        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	2021-01-01 00:09:59.999999	1
+        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  // FIXME: Should we check that windowColumn is a correct one - generated by
+  // a window or session_window aggregation? Do we need to check for the corresponding marker?
+
+  override def child: Expression = windowColumn
+  override def inputTypes: Seq[AbstractDataType] = Seq(StructType)
+
+  // FIXME: pick either "start" or "end" explicitly here

Review Comment:
   Let's leave a comment instead; window struct has start and end column which has identical data type, hence choosing the first will work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002536939


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .remove(TimeWindow.marker)
+          .remove(SessionWindow.marker)
+          .build()
+
+        val attr = AttributeReference(
+          "window_time", windowTime.dataType, metadata = newMetadata)()
+
+        // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
+        // it is, it is going to be bound to the different window even if we apply the same window
+        // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
+        // correct window range.
+        val subtractExpr =
+        PreciseTimestampConversion(
+          Subtract(PreciseTimestampConversion(
+            // FIXME: better handling of window.end
+            GetStructField(windowTime.windowColumn, 1),
+            windowTime.dataType, LongType), Literal(1L)),
+          LongType,
+          windowTime.dataType)
+
+        // FIXME: Can there already be a window_time column? Will this lead to conflict?

Review Comment:
   removed the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
         .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows. If we fix the rule to allow multiple window_time calls with different windows, it should just work.
   
   Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities. 
   
   That said, Spark can handle the cartesian product of window"s". The unsupported operation checker can be more restrictive than what Spark can actually do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38288:
URL: https://github.com/apache/spark/pull/38288#issuecomment-1288224517

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000086884


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.

Review Comment:
   done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {

Review Comment:
   Moved window, session_window and window_time resolution to ResolveTimeWindows.scala



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),
+    ...          window_time(w.window).cast("string").alias("window_time"),
+    ...          "sum").collect()

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002777786


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   Btw, if we want to simply retain the limitation, we should add the check into unsupported ops checker and provide better error message rather than unresolved operator, like we do for time window.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column
URL: https://github.com/apache/spark/pull/38288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38288:
URL: https://github.com/apache/spark/pull/38288#issuecomment-1288224352

   My comments are all minors and my follow-up fix will resolve these comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002795345


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   #38361



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r998904727


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {

Review Comment:
   can we put it in a new file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001274667


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string").as("window1"),

Review Comment:
   This is very interesting... @cloud-fan Any idea on this? Do we capture the expression as the same? Otherwise it sounds to be odd to pass the condition below:
   
   ```
   val child = p.children.head
   val windowTimeExpressions =
     p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
   
   if (windowTimeExpressions.size == 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
         .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows. If we fix the rule to allow multiple window_time calls with different windows, it should just work.
   
   Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000120218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   Alex and I have been discussed about the complicated corner cases when we enable this... This should be totally feasible in point of SQL's view, but tricky in the context of streaming.
   
   Btw I just figured out the simplest rationalization to allow the only one. We reserve the output column name for the function (as "window_time"), as same as we do for window()/session_window() function. Otherwise we will have to assign the resulting column like `window_time(window)`. So that seems to be a matter of whether we want to do the favor of window-family functions be special or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999155434


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.

Review Comment:
   ```suggestion
           the column for computed results.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002777626


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string").as("window1"),

Review Comment:
   Although the test is failing now, it's failing due to rule of time window and corresponding unsupported ops checker. Is it intentional? I guess we want to test the case of multiple window_time.
   
   Refer this comment: https://github.com/apache/spark/pull/38288#discussion_r1002572937



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38288:
URL: https://github.com/apache/spark/pull/38288#issuecomment-1365840718

   I'm sorry I was on vacation - you're right we seem to miss pruning, and we also seem to miss the same for session window. My bad. 
   
   I've submitted PRs separately for both cases. Please take a look.
   * https://github.com/apache/spark/pull/39245 (session_window, separated out to make backport easy)
   * https://github.com/apache/spark/pull/39247 (window_time)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001755349


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   The test case should construct two different windows and call window_time per each window to "fail".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999156147


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),
+    ...          window_time(w.window).cast("string").alias("window_time"),
+    ...          "sum").collect()

Review Comment:
   ```suggestion
       >>> w.select(
       ...     w.window.end.cast("string").alias("end"),
       ...     window_time(w.window).cast("string").alias("window_time"),
       ...     "sum"
       ... ).show()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
         .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows.
   
   Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000120218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   Alex and I have been discussed about the complicated corner cases when we enable this... This should be totally feasible in point of SQL's view, but tricky in the context of streaming. (Alex will play more on this tomorrow.)
   
   Btw I just figured out the simplest rationalization to allow the only one. We reserve the output column name for the function (as "window_time"), as same as we do for window()/session_window() function. Otherwise we will have to assign the resulting column like `window_time(window)`. So that seems to be a matter of whether we want to do the favor of window-family functions be special or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002787441


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   @cloud-fan 
   Let me propose the fix in separate PR. I have a fix but may be better to have a separate PR to reflect the comment easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001081367


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   This matches the condition in the other window resolution rules above. I added a test which has 2 window_time calls in the select and it passes. I admit I am not well versed in the query plan code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rxin commented on pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
rxin commented on PR #38288:
URL: https://github.com/apache/spark/pull/38288#issuecomment-1364413331

   Also we need to add proper SQL tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002574932


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   I left a comment to workaround the ops checker error and still hit this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001065851


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	2
+        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	2021-01-01 00:09:59.999999	1
+        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  // FIXME: Should we check that windowColumn is a correct one - generated by

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002731494


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string").as("window1"),

Review Comment:
   Thanks for figuring this out. Redid the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002731442


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,52 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999155104


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.

Review Comment:
   ```suggestion
       :class:`pyspark.sql.types.StructType`.
   
   ```
   
   Otherwise documentation rendering is broken IIRC.



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0

Review Comment:
   ```suggestion
       .. versionadded:: 3.4.0
   
   ```



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.

Review Comment:
   ```suggestion
           The window column of a window aggregate records.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999154595


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the

Review Comment:
   I would do something like this to make it clear: which part is code
   ```suggestion
       computed as ``window_time(window)`` and are ``window.end - lit(1).alias("microsecond")`` (as microsecond is the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000989244


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	2
+        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	2021-01-01 00:09:59.999999	1
+        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  // FIXME: Should we check that windowColumn is a correct one - generated by
+  // a window or session_window aggregation? Do we need to check for the corresponding marker?
+
+  override def child: Expression = windowColumn
+  override def inputTypes: Seq[AbstractDataType] = Seq(StructType)
+
+  // FIXME: pick either "start" or "end" explicitly here

Review Comment:
   removed.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&
+        windowTimeExpressions.head.windowColumn.resolved &&
+        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val windowTime = windowTimeExpressions.head
+
+        val metadata = windowTime.windowColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        if (!metadata.contains(TimeWindow.marker) &&
+          !metadata.contains(SessionWindow.marker)) {
+          // FIXME: error framework?
+          throw new AnalysisException("The input is not a correct window column!")
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .remove(TimeWindow.marker)
+          .remove(SessionWindow.marker)
+          .build()
+
+        val attr = AttributeReference(
+          "window_time", windowTime.dataType, metadata = newMetadata)()
+
+        // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
+        // it is, it is going to be bound to the different window even if we apply the same window
+        // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
+        // correct window range.
+        val subtractExpr =
+        PreciseTimestampConversion(
+          Subtract(PreciseTimestampConversion(
+            // FIXME: better handling of window.end

Review Comment:
   removed



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window.
+      The extracted time is (window.end - 1) which reflects the fact that the the aggregating
+      windows have exclusive upper bound - [start, end)
+      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
+  """,
+  arguments = """
+    Arguments:
+      * window_column - The column representing time/session window.
+  """,
+  examples = """
+    Examples:
+      > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	2
+        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	2021-01-01 00:09:59.999999	1
+        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	2021-01-01 00:04:59.999999	1
+  """,
+  group = "datetime_funcs",
+  since = "3.3.0")
+// scalastyle:on line.size.limit line.contains.tab
+case class WindowTime(windowColumn: Expression)
+  extends UnaryExpression
+    with ImplicitCastInputTypes
+    with Unevaluable
+    with NonSQLExpression {
+
+  // FIXME: Should we check that windowColumn is a correct one - generated by

Review Comment:
   ResolveWindowTime does check the marker. Removed.



##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3777,6 +3777,23 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+  /**
+   * Extracts the event time from the window column of a record produced by window aggregation

Review Comment:
   Removed 'operator'. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001274667


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string").as("window1"),

Review Comment:
   This is very interesting... @cloud-fan Any idea on this? Do we capture the expression as the same? Otherwise it sounds to be a be odd to pass the condition below:
   
   ```
   val child = p.children.head
   val windowTimeExpressions =
     p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
   
   if (windowTimeExpressions.size == 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r999158461


##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),

Review Comment:
   Can we add some explanation about what this example is doing? e.g.)
   
   ```
       >>> import datetime
       >>> df = spark.createDataFrame(
       ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
       ... ).toDF("date", "val")
   
       Define a window which groups the data for every five seconds, and aggregate
       them .. blah blah
   
       >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
   
       window_time blah blah
   
       >>> w.select( ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1001749087


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string").as("window1"),

Review Comment:
   OK this was pretty straightforward... Refer the definition of WindowTime.
   
   `case class WindowTime(windowColumn: Expression)`
   
   We apply `.toSet` to collect the instances with deduplication, so as long as the windowColumn is resolved as the same, they are considered to the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002539045


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val windowTimeExpressions =
+        p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+      if (windowTimeExpressions.size == 1 &&

Review Comment:
   Modified the test. Indeed the scenario fails with the unsupported ops checker error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org