You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/20 03:00:25 UTC

[GitHub] [spark] alex-balikov commented on a diff in pull request #38288: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column

alex-balikov commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000086884


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.

Review Comment:
   done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Resolves the window_time expression which extracts the correct window time from the
+ * window column generated as the output of the window aggregating operators. The
+ * window column is of type struct { start: TimestampType, end: TimestampType }.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {

Review Comment:
   Moved window, session_window and window_time resolution to ResolveTimeWindows.scala



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),
+    ...          window_time(w.window).cast("string").alias("window_time"),
+    ...          "sum").collect()

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+    Examples
+    --------
+    >>> import datetime
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
+    ... ).toDF("date", "val")
+    >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
+    >>> w.select(w.window.end.cast("string").alias("end"),

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.

Review Comment:
   done



##########
python/pyspark/sql/functions.py:
##########
@@ -4884,6 +4884,42 @@ def check_string_field(field, fieldName):  # type: ignore[no-untyped-def]
         return _invoke_function("window", time_col, windowDuration)
 
 
+def window_time(
+    windowColumn: "ColumnOrName",
+) -> Column:
+    """Computes the event time from a window column. The column window values are produced
+    by window aggregating operators and are of type
+    StructType { start: TimestampType, end: TimestampType } where start is inclusive and
+    end is exclusive. The event time of records produced by window aggregating operators can be
+    computed as window_time(window) and are window.end - 1 microsecond (as microsecond is the
+    minimal supported event time precision).
+    The window column must be one produced by a window aggregating operator - of type
+    :class:`pyspark.sql.types.StructType`.
+    .. versionadded:: 3.4.0
+    Parameters
+    ----------
+    windowColumn : :class:`~pyspark.sql.Column`
+        The window column of a window aggregate records.
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org