You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/17 00:12:36 UTC

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973478091


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but

Review Comment:
   'Alternatively, you can pickle the data ...' - instead say
   
   'For such cases, the user should pickle the data into BinaryType. Note that this approach may be sensitive to backwards and forward compatibility issues of Python picks and Spark can not guarantee compatibility.
   
   though I think you could drop the note as that is orthogonal to Spark.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
+        it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
+        does not guarantee the compatibility.
+
+        The length of each element in both input and returned value, `pandas.DataFrame`, can be

Review Comment:
   'The size of each DataFrame in both the input and output ...'
   
   'The number of DataFrames in both the input and output can also be arbitrary.'



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there

Review Comment:
   I do not like '->' here - this is supposed to be text. How about:
   
   'The function will be invoked first for all input groups and then for al timed out states where the input data will be null.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,

Review Comment:
   'all columns are passed together as `pandas.DataFrame` ...' - this is confusing - of course all columns will be passed together. How about:
   
   Each group is passed as one or more pandas.DataFrame group of records with all columns packed into the DataFrame.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all

Review Comment:
   'Note that the user function should loop through and process all elements in the iterator. The user function should not make a guess of the number of  elements in the iterator.' 
   
   - Why? This sounds like the use *must* process all iterator entries or otherwise something bad would happen. I would reword this to indicate that the grouped data could be split into multiple entries - 
   
   'Note that the group data may be split as multiple Iterator records and the user function should not assume that it receives a single record.'
   
   I would still suggest we have a design discussion about splitting groups unnecessary as I believe we should not do this.
   
   



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every

Review Comment:
   remove 'repeatedly' - f'for each group' implies that.



##########
python/pyspark/sql/pandas/_typing/__init__.pyi:
##########
@@ -256,6 +258,10 @@ PandasGroupedMapFunction = Union[
     Callable[[Any, DataFrameLike], DataFrameLike],
 ]
 
+PandasGroupedMapFunctionWithState = Callable[
+    [Any, Iterable[DataFrameLike], GroupStateImpl], Iterable[DataFrameLike]

Review Comment:
   Can the type be GroupState without the 'Impl' - looks bad in public api.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not

Review Comment:
   Not StructType types, e.g. user-defined or native Python types are not supported.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
+        it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
+        does not guarantee the compatibility.
+
+        The length of each element in both input and returned value, `pandas.DataFrame`, can be
+        arbitrary. The length of iterator in both input and returned value can be also arbitrary.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        func : function
+            a Python native function to be called on every group. It should takes parameters
+            (key, Iterator[`pandas.DataFrame`], state) and returns Iterator[`pandas.DataFrame`].
+            Note that the type of key is tuple, and the type of state is
+            :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+        outputStructType : :class:`pyspark.sql.types.DataType` or str
+            the type of the output records. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+        stateStructType : :class:`pyspark.sql.types.DataType` or str
+            the type of the user-defined state. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.

Review Comment:
   same - can you provide an example of the string



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
+        it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
+        does not guarantee the compatibility.
+
+        The length of each element in both input and returned value, `pandas.DataFrame`, can be
+        arbitrary. The length of iterator in both input and returned value can be also arbitrary.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        func : function
+            a Python native function to be called on every group. It should takes parameters
+            (key, Iterator[`pandas.DataFrame`], state) and returns Iterator[`pandas.DataFrame`].
+            Note that the type of key is tuple, and the type of state is
+            :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+        outputStructType : :class:`pyspark.sql.types.DataType` or str
+            the type of the output records. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.

Review Comment:
   can you provide an example here of the string?



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation

Review Comment:
   again, I think 'repeatedly' is unnecessary.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in

Review Comment:
   in the returned value



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
+        it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
+        does not guarantee the compatibility.
+
+        The length of each element in both input and returned value, `pandas.DataFrame`, can be
+        arbitrary. The length of iterator in both input and returned value can be also arbitrary.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        func : function
+            a Python native function to be called on every group. It should takes parameters

Review Comment:
   it should *take* parameters. ... and return Iterator...



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined

Review Comment:
   ... describing the schema of *the* user-defined state. The value of *the* state will be presented as a tuple and the update should be performed with a tuple.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.

Review Comment:
   again, can we drop the Impl from the state class?



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and

Review Comment:
   The function takes parameters ... and returns Iterator[...]



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined
+        schema if specified as strings, or match the field data types by position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the update should be
+        performed with the tuple. User defined types e.g. native Python class types are not
+        supported. Alternatively, you can pickle the data and produce the data as BinaryType, but
+        it is tied to the backward and forward compatibility of pickle in Python, and Spark itself
+        does not guarantee the compatibility.
+
+        The length of each element in both input and returned value, `pandas.DataFrame`, can be
+        arbitrary. The length of iterator in both input and returned value can be also arbitrary.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        func : function
+            a Python native function to be called on every group. It should takes parameters
+            (key, Iterator[`pandas.DataFrame`], state) and returns Iterator[`pandas.DataFrame`].
+            Note that the type of key is tuple, and the type of state is

Review Comment:
   Note that the type of *the* key is tuple and the type of *the* state is ...



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a user-defined
+        per-group state. The result Dataset will represent the flattened record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group repeatedly in every
+        trigger, and updates to each group's state will be saved across invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence of the invocation
+        will be input data -> state timeout. When the function is invoked for state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are combined as a
+        :class:`DataFrame`. Note that the user function should loop through and process all
+        elements in the iterator. The user function should not make a guess of the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of all elements in
+        returned value, `pandas.DataFrame` must either match the field names in the defined

Review Comment:
   returned pandas.DataFrame must ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org