You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/11 12:22:38 UTC

[GitHub] [spark] EnricoMi opened a new pull request, #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

EnricoMi opened a new pull request, #38624:
URL: https://github.com/apache/spark/pull/38624

   ### What changes were proposed in this pull request?
   Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`, this allow
   
   ### Why are the changes needed?
   PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
   ```
   df.mapInArrow(map_arrow, schema="...")
   df.mapInPandas(map_pandas, schema="...")
   ```
   
   For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
   ```
   df.groupBy("id").applyInPandas(apply_pandas, schema="...")
   ```
   
   Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
   ```
   def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
     return df
   
   def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
     df = polars.from_arrow(table)
     return apply_polars(df).to_arrow()
   
   df.groupBy("id").applyInArrow(apply_arrow, schema="...")
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
   
   ### How was this patch tested?
   Tested with unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1686008186

   > What's the design philosophy behind having to go through pandas first with possible loss of data and precision, since pandas don't have proper strict type casting, while there is a more efficient path available?
   
   Arrow was considered as an internal format initially, and that's the whole reason why pandas came up first. In fact, the number of pandas users are (much) higher given some stats I get given, and is informally considered as the standard TBH.  It's too late to deprecate/remote pandas API, and switch the standard to Arrow in any event.
   
   > Going through Pandas requires users to install Pandas though they are using a different Arrow-based dataset API.
   
   It will requires the driver side to install both pandas and Arrow. Maybe pandas alone can be missing on executors but I think it's sort of a minor point (since nobody actually complains in the mailing list or JIRA as far as I can see).
   
   > That would require the user to implement what GroupedIterator in Spark does.
   
   Nah, the operation within the function can perform a groupby operation, e.g., `pandas.groupby().apply` then it will do the same.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1780540196

   Guys, just to be extra clear, I am one of many committers here. I am not the decision maker, and technically I am not -1 on this. So let's raise a discussion in dev mailing list (https://spark.apache.org/news/spark-mailing-lists-moving-to-apache.html), and see if people like, or ping other committers if there are some support here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1126491622


##########
dev/infra/Dockerfile:
##########
@@ -64,7 +64,7 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
 # See more in SPARK-39735
 ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
 
-RUN pypy3 -m pip install numpy 'pandas<=1.5.3' scipy coverage matplotlib
+RUN pypy3 -m pip install numpy pyarrow 'pandas<=1.5.3' scipy coverage matplotlib

Review Comment:
   This is only needed to test Python code in docstrings. However, installing pyarror in pypy is not easy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1687586216

   > > I don't see any reason to oppose this
   > 
   > My main reason is that I would like to avoid adding a bunch of API to address a couple of corner cases. Ideally we should have a set of API as few as possible that handles 99% cases. That's why `mapInArrow` itself is marked as a developer API.
   > - Once we add this, we should also think about adding Arrow version of other pandas friends, e.g., Arrow UDF
   
   
   Ok I can understand that, but in this case we would not be adding complete new APIs with no shared code.
   
   Ideally you would have an API that does MapInArrow and GroupByApplyInArrow, and then for the Pandas use case those APIs are called with an additional step and that's the casting to Pandas. 
   
   I don't see how this will add much bloat to the API. Then you could still mark the arrow APIs as developer API since people won't likely use it directly according to you, but at least the shared API is accessible for the two use cases:
   - Working directly with Arrow libraries (polars/ data fusion)
   - working with Pandas
   
   
   > - The benefit of doing this isn't very significant considering that most of the cases are already covered by `mapInArrow`, and by converting pandas to Arrow.
   
   Just because something somewhat works, doesn't mean we should keep doing it if there is a better and more efficient way.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375549650


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -30,13 +30,15 @@
         PandasGroupedMapFunction,
         PandasGroupedMapFunctionWithState,
         PandasCogroupedMapFunction,
+        ArrowGroupedMapFunction,
+        ArrowCogroupedMapFunction,
     )
     from pyspark.sql.group import GroupedData
 
 
 class PandasGroupedOpsMixin:
     """
-    Min-in for pandas grouped operations. Currently, only :class:`GroupedData`
+    Min-in for Pandas grouped operations. Currently, only :class:`GroupedData`

Review Comment:
   Let's just keep it as `pandas`, and below too.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -354,6 +356,132 @@ def applyInPandasWithState(
         )
         return DataFrame(jdf, self.session)
 
+    def applyInArrow(
+        self, func: "ArrowGroupedMapFunction", schema: Union[StructType, str]
+    ) -> "DataFrame":
+        """
+        Maps each group of the current :class:`DataFrame` using an Arrow udf and returns the result
+        as a `DataFrame`.
+
+        The function should take a `pyarrow.Table` and return another
+        `pyarrow.Table`. Alternatively, the user can pass a function that takes
+        a tuple of `pyarrow.Scalar` grouping key(s) and a `pyarrow.Table`.
+        For each group, all columns are passed together as a `pyarrow.Table`
+        to the user-function and the returned `pyarrow.Table` are combined as a
+        :class:`DataFrame`.
+
+        The `schema` should be a :class:`StructType` describing the schema of the returned
+        `pyarrow.Table`. The column labels of the returned `pyarrow.Table` must either match
+        the field names in the defined schema if specified as strings, or match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pyarrow.Table` can be arbitrary.
+
+        .. versionadded:: 3.4.0

Review Comment:
   4.0.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1723698807

   @ion-elgreco From my tests, it is not working. It would be good if @HyukjinKwon could validate if the workaround is indeed not working as expected as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1396846806


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -495,6 +623,104 @@ def applyInPandas(
         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
         return DataFrame(jdf, self._gd1.session)
 
+    def applyInArrow(
+        self, func: "ArrowCogroupedMapFunction", schema: Union[StructType, str]
+    ) -> "DataFrame":
+        """
+        Applies a function to each cogroup using Arrow and returns the result
+        as a `DataFrame`.
+
+        The function should take two `pyarrow.Table`\\s and return another
+        `pyarrow.Table`. Alternatively, the user can pass a function that takes
+        a tuple of `pyarrow.Scalar` grouping key(s) and the two `pyarrow.Table`\\s.
+        For each side of the cogroup, all columns are passed together as a
+        `pyarrow.Table` to the user-function and the returned `pyarrow.Table` are combined as
+        a :class:`DataFrame`.
+
+        The `schema` should be a :class:`StructType` describing the schema of the returned
+        `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
+        the field names in the defined schema if specified as strings, or match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pyarrow.Table` can be arbitrary.
+
+        .. versionadded:: 3.4.0

Review Comment:
   Done, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1398214788


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -454,7 +582,7 @@ def applyInPandas(
         the grouping key(s) will be passed as the first argument and the data will be passed as the
         second and third arguments.  The grouping key(s) will be passed as a tuple of numpy data
         types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
-        `pandas.DataFrame` containing all columns from the original Spark DataFrames.
+        `pandas.DataFrame`\\s containing all columns from the original Spark DataFrames.

Review Comment:
   nit: We don't need to escape the letter "s" in docstrings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1841525581

   That will be 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685688494

   > I get that `cogroup` might not be possible tho. But we can just convert pandas back to arrow batches easily. Is this really required for some scenario? IIRC this is only useful for addressing nested types.
   
   Even with pandas 2.0+ converting between pandas and Arrow is not fully zero copy, so this will add some latency to it. 
   
   And Pandas has some quirks with upcasting, not respecting the original datatypes, null handling, and like you say nested dtypes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375616619


##########
python/pyspark/worker.py:
##########
@@ -306,6 +308,33 @@ def verify_element(elem):
     )
 
 
+def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):

Review Comment:
   can we make `wrap_grouped_map_arrow_udf ` `wrap_cogrouped_map_arrow_udf` and `verify_arrow_table_schema` more consistent with pandas side:
   `wrap_grouped_map_pandas_udf`, `wrap_cogrouped_map_pandas_udf` and `verify_pandas_result`?



##########
python/pyspark/worker.py:
##########
@@ -330,6 +359,97 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def verify_arrow_table_schema(table, assign_cols_by_name, expected_cols_and_types):
+    import pyarrow as pa
+
+    if not isinstance(table, pa.Table):
+        raise TypeError(
+            "Return type of the user-defined function should be "
+            "pyarrow.Table, but is {}".format(type(table))
+        )
+
+    # the types of the fields have to be identical to return type
+    # an empty table can have no columns; if there are columns, they have to match
+    if len(table.columns) != 0 or table.num_rows != 0:

Review Comment:
   nit:
   
   ```suggestion
       if table.num_columns != 0 or table.num_rows != 0:
   ```



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -30,13 +30,15 @@
         PandasGroupedMapFunction,
         PandasGroupedMapFunctionWithState,
         PandasCogroupedMapFunction,
+        ArrowGroupedMapFunction,
+        ArrowCogroupedMapFunction,
     )
     from pyspark.sql.group import GroupedData
 
 
 class PandasGroupedOpsMixin:
     """
-    Min-in for pandas grouped operations. Currently, only :class:`GroupedData`
+    Min-in for Pandas grouped operations. Currently, only :class:`GroupedData`

Review Comment:
   +1, this PR is huge, let's avoid unrelated changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375549969


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -162,6 +162,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)

Review Comment:
   ```suggestion
       Loads Arrow record batches as :class:`pyarrow.RecordBatch` (one :class:`pyarrow.RecordBatch` per group)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375550772


##########
python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py:
##########
@@ -0,0 +1,301 @@
+#

Review Comment:
   new test cases should be added into `dev/sparktestsupport/modules.py`. Also, we can do it separately but would be nicer if we add `test_parity_arrow_cogrouped_map.py` for Spark connect, e.g., see `pyspark.sql.tests.connect`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1686654192

   > Arrow was considered as an internal format initially, and that's the whole reason why pandas came up first. In fact, the number of pandas users are (much) higher given some stats I get given, and is informally considered as the standard TBH. It's too late to deprecate/remote pandas API, and switch the standard to Arrow in any event.
   
   That's fine, the pandasUDF shouldn't be deprecated, but there should be at least an alternative to only use Arrow. Especially since the ARROW ecosystem is growing and slowly becoming the Defacto standard for transferring data between different interfaces.
   
   > I am open if you'd like to raise a discussion in the dev mailing list, and we can discuss there to reach a consensus - I don't object.
   
   Where is this dev mailing list and how do I raise a discussion there?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1687231741

   > Where is this dev mailing list and how do I raise a discussion there?
   
   See https://spark.apache.org/news/spark-mailing-lists-moving-to-apache.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375550162


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -162,6 +162,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)

Review Comment:
   ```suggestion
       Loads Arrow record batches as ``[[pyarrow.RecordBatch]]`` (one ``[pyarrow.RecordBatch]`` per group)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685484092

   I get that `cogroup` might not be possible tho. But we can just convert pandas back to arrow batches easily. Is this really required for some scenario? IIRC this is only useful for addressing nested types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1683968169

   Any updates on getting this merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] goodwanghan commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "goodwanghan (via GitHub)" <gi...@apache.org>.
goodwanghan commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685705080

   > 
   
   Thanks for the clarification. Actually repartition plus presort should work.
   
   I think technically, this is very doable, the performance should also be decent. But I think this is an essential programming interface that the official pyspark should have (and given you already have `mapInArrow`, `applyInArrow` seems to be a natural expectation from users). It's important also because it is a semantic that is totally independent from pandas. You underlying implementation of pandas udf is all based on arrow, I feel it is not even necessary to call them pandas udfs (I don't expect you to change the names, just to share my opinion)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1716475234

   @EnricoMi @HyukjinKwon Can we get some traction on this PR to either close as `wont-merge` or evaluate what needs to be done so it can be merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] goodwanghan commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "goodwanghan (via GitHub)" <gi...@apache.org>.
goodwanghan commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1454933112

   @EnricoMi @HyukjinKwon I think this is a very critical feature that is missing in the current PySpark. Can we consider merging this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1640085866

   @xinrong-meng @HyukjinKwon rebased with master and conflicts resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1688166890

   @HyukjinKwon this may be a misunderstanding on my part regarding the inner works but for the `repartition(grouping_cols).mapInArrow` workaround, wouldn't the [batch size](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#setting-arrow-batch-size) present a problem where we would end up not having the full group available in the Arrow RecordBatch depending on the batch size parameter, for example using the default 10K batch size and the data have more than 10K rows in any partition?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1399129163


##########
python/pyspark/sql/udf.py:
##########
@@ -275,6 +275,22 @@ def returnType(self) -> DataType:
                         "return_type": str(self._returnType_placeholder),
                     },
                 )
+        elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_type(self._returnType_placeholder)
+                except TypeError:

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1841672129

   Around next June 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1841525907

   @HyukjinKwon thanks for merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1396553305


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -495,6 +623,104 @@ def applyInPandas(
         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
         return DataFrame(jdf, self._gd1.session)
 
+    def applyInArrow(
+        self, func: "ArrowCogroupedMapFunction", schema: Union[StructType, str]
+    ) -> "DataFrame":
+        """
+        Applies a function to each cogroup using Arrow and returns the result
+        as a `DataFrame`.
+
+        The function should take two `pyarrow.Table`\\s and return another
+        `pyarrow.Table`. Alternatively, the user can pass a function that takes
+        a tuple of `pyarrow.Scalar` grouping key(s) and the two `pyarrow.Table`\\s.
+        For each side of the cogroup, all columns are passed together as a
+        `pyarrow.Table` to the user-function and the returned `pyarrow.Table` are combined as
+        a :class:`DataFrame`.
+
+        The `schema` should be a :class:`StructType` describing the schema of the returned
+        `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
+        the field names in the defined schema if specified as strings, or match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pyarrow.Table` can be arbitrary.
+
+        .. versionadded:: 3.4.0

Review Comment:
   should be `4.0.0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685712229

   `mapInArrow` is marked as a developer API, and my initial intention was to avoid adding the arrow version of that everywhere - in theory `mapInArrow` can cover all the cases except cogrouping (I admit I missed this case when we add `mapInArrow`). That's why I am hesitant to add this now.
   
   What about pandas UDF and iteration friends? I think it's too much to add all Arrow versions to address a couple of corner cases and performance:
   
    - The benefit of the performance is even not super critical since we already copy Arrow here and there. In fact, vilna Arrow itself copies when it's converted to something else by default.
   - repartition-mapInArrow will cover grouping case. The only leftover case is cogrouping that has a bit of overhead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1687250743

   > I don't see any reason to oppose this
   
   My main reason is that I would like to avoid adding a bunch of API to address a couple of corner cases. Ideally we should have a set of API as few as possible that handles 99% cases. That's why `mapInArrow` itself is marked as a developer API.
   - Once we add this, we should also think about adding Arrow version of other pandas friends, e.g., Arrow UDF
   - The benefit of doing this isn't very significant considering that most of the cases are already covered by `mapInArrow`, and by converting pandas to Arrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1716592989

   > @EnricoMi @HyukjinKwon Can we get some traction on this PR to either close as `wont-merge` or evaluate what needs to be done so it can be merged? 
   
   Repartition is not working, right? If not then groupByMapInArrow should become available since there is no workaround.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708508134

   @HyukjinKwon This is not matching what I am seeing with is test case. The batch size setting is definitely changing what is available within the `mapInArrow()` call after the repartition by the grouping column.
   
   ```python
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   def udf2(df):
       print(f"Num of rows per group: {len(df)}")
       return df
   
   print("\n")
   print("Using applyInPandas")
   df.groupby("idx").applyInPandas(udf2, schema=df.schema).write.format("noop").mode("overwrite").save()
   ```
   
   The output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 20000
   
   
   
   
   Using applyInPandas
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1640086388

   @Kimahriman you mentioned this would fix a 2GB memory limit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375549814


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -454,7 +581,7 @@ def applyInPandas(
         the grouping key(s) will be passed as the first argument and the data will be passed as the
         second and third arguments.  The grouping key(s) will be passed as a tuple of numpy data
         types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
-        `pandas.DataFrame` containing all columns from the original Spark DataFrames.
+        `pandas.DataFrame`\\s containing all columns from the original Spark DataFrames.

Review Comment:
   why `\\s`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1817524767

   Left comments. The PR looks great, thanks for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685602103

   adding @viirya @ueshin @BryanCutler in case you guys have some thought on this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685601694

   Yeah, I meant `df.repartition(grouping_cols).mapInArrow() `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1686009307

   I am open if you'd like to raise a discussion in the dev mailing list, and we can discuss there to reach a consensus - I don't object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1781147252

   I have just sent this: https://lists.apache.org/thread/w4qf1724f8mckglxghzrmgbrzb2zlth6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375550034


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -162,6 +162,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)
+    and serializes `[([pa.RecordBatch], arrow_type)]`.

Review Comment:
   ```suggestion
       and serializes ``[([pyarrow.RecordBatch], arrow_type)]``.
   ```



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -162,6 +162,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)

Review Comment:
   ```suggestion
       Loads Arrow record batches as :class:`pyarrow.RecordBatch` (one :class:`pyarrow.RecordBatch` per group)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1784278418

   cc @viirya @BryanCutler @xinrong-meng @ueshin @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1384512107


##########
python/pyspark/worker.py:
##########
@@ -306,6 +308,33 @@ def verify_element(elem):
     )
 
 
+def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):

Review Comment:
   Consistent in the sense of naming or what it does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1662931308

   Looking forward to see this PR getting merged :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Kimahriman commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1640248937

   > @Kimahriman you mentioned this would fix a 2GB memory limit?
   
   Yeah combined with the new setting `spark.sql.execution.arrow.useLargeVarTypes` it should allow getting around a 2GiB limit on a single string/binary column being returned from a `applyInPandas` function


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1202124485


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -147,6 +147,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)
+    and serializes `[([pa.RecordBatch], arrow_type)]`.
+
+    Parameters
+    ----------
+    assign_cols_by_name : bool
+        If True, then DataFrames will get columns by name
+    """
+
+    def __init__(self, assign_cols_by_name):
+        super(ArrowStreamGroupUDFSerializer, self).__init__()
+        self._assign_cols_by_name = assign_cols_by_name
+
+    def dump_stream(self, iterator, stream):
+        import pyarrow as pa
+
+        # flatten inner list [([pa.RecordBatch], arrow_type)] into [(pa.RecordBatch, arrow_type)]
+        # so strip off inner iterator induced by ArrowStreamUDFSerializer.load_stream
+        batch_iter = [
+            (batch, arrow_type)
+            for batches, arrow_type in iterator  # tuple constructed in wrap_grouped_map_arrow_udf
+            for batch in batches
+        ]
+
+        if self._assign_cols_by_name:
+            batch_iter = [
+                (
+                    pa.RecordBatch.from_arrays(
+                        [batch.column(field.name) for field in arrow_type],
+                        names=[field.name for field in arrow_type],
+                    ),
+                    arrow_type,
+                )
+                for batch, arrow_type in batch_iter
+            ]

Review Comment:
   Thanks for highlighting this, I have changed this to a generator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1477426095

   CC @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685933606

   > I get that `cogroup` might not be possible tho. But we can just convert pandas back to arrow batches easily. Is this really required for some scenario? IIRC this is only useful for addressing nested types.
   
   Going through Pandas requires users to install Pandas though they are using a different Arrow-based dataset API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685936615

   > qq, can't we workaround by `df.repartitionByExpression().mapInArrow()` for `groupby` case?
   
   That would require the user to implement what `GroupedIterator` in Spark does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685790842

   > `mapInArrow` is marked as a developer API, and my initial intention was to avoid adding the arrow version of that everywhere - in theory `mapInArrow` can cover all the cases except cogrouping (I admit I missed this case when we add `mapInArrow`). That's why I am hesitant to add this now.
   > 
   > What about pandas UDF and iteration friends? I think it's too much to add all Arrow versions to address a couple of corner cases and performance:
   > 
   >  - The benefit of the performance is even not super critical since we already copy Arrow here and there. In fact, vilna Arrow itself copies when it's converted to something else by default.
   > - repartition-mapInArrow will cover grouping case. The only leftover case is cogrouping that has a bit of overhead.
   
   What's the design philosophy behind having to go through pandas first with possible loss of data and precision, since pandas don't have proper strict type casting, while there is a more efficient path available?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1692710197

   No, the whole group is in the memory in case of `groupby.applayInPandas`. They are same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1412244376


##########
python/pyspark/worker.py:
##########
@@ -306,6 +308,33 @@ def verify_element(elem):
     )
 
 
+def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):

Review Comment:
   @zhengruifeng please see my question above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1399122964


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -454,7 +582,7 @@ def applyInPandas(
         the grouping key(s) will be passed as the first argument and the data will be passed as the
         second and third arguments.  The grouping key(s) will be passed as a tuple of numpy data
         types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
-        `pandas.DataFrame` containing all columns from the original Spark DataFrames.
+        `pandas.DataFrame`\\s containing all columns from the original Spark DataFrames.

Review Comment:
   Removed this unrelated change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1311784451

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375549814


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -454,7 +581,7 @@ def applyInPandas(
         the grouping key(s) will be passed as the first argument and the data will be passed as the
         second and third arguments.  The grouping key(s) will be passed as a tuple of numpy data
         types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
-        `pandas.DataFrame` containing all columns from the original Spark DataFrames.
+        `pandas.DataFrame`\\s containing all columns from the original Spark DataFrames.

Review Comment:
   why `\\s`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1781107616

   > Guys, just to be extra clear, I am one of many committers here. I am not the decision maker, and technically I am not -1 on this. So let's raise a discussion in dev mailing list (https://spark.apache.org/news/spark-mailing-lists-moving-to-apache.html), and see if people like, or ping other committers if there are some support here.
   
   I personally have no clue how to use that archaic website


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1781124306

   I have sent an e-mail, but it doesn't get through. I keep trying.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1399128877


##########
python/pyspark/sql/udf.py:
##########
@@ -295,6 +311,20 @@ def returnType(self) -> DataType:
                         "return_type": str(self._returnType_placeholder),
                     },
                 )
+        elif self.evalType == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_type(self._returnType_placeholder)
+                except TypeError:
+                    raise NotImplementedError(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "igorghi (via GitHub)" <gi...@apache.org>.
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708893876

   Adding a `sortWithinPartitions("idx")` also does not result in a viable workaround given the results. That only "kinda" works when all partitions/groups have the same size, and we know that beforehand, so we can set the batch size to that value; obviously, this is not a reasonable workflow.
   
   ```python
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           print(f"Unique idx: {i.to_pandas().idx.unique()}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   ```
   
   Output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000                                    (0 + 1) / 1]
   Unique idx: [1]
   Num of rows per partition: 10000
   Unique idx: [1]
   Num of rows per partition: 10000
   Unique idx: [2]
   Num of rows per partition: 10000
   Unique idx: [2]
   Num of rows per partition: 10000
   Unique idx: [3]
   Num of rows per partition: 10000
   Unique idx: [3]
   Num of rows per partition: 10000
   Unique idx: [4]
   Num of rows per partition: 10000
   Unique idx: [4]
   Num of rows per partition: 10000
   Unique idx: [5]
   Num of rows per partition: 10000
   Unique idx: [5]
   Num of rows per partition: 10000
   Unique idx: [6]
   Num of rows per partition: 10000
   Unique idx: [6]
   Num of rows per partition: 10000
   Unique idx: [7]
   Num of rows per partition: 10000
   Unique idx: [7]
   Num of rows per partition: 10000
   Unique idx: [8]
   Num of rows per partition: 10000
   Unique idx: [8]
   Num of rows per partition: 10000
   Unique idx: [9]
   Num of rows per partition: 10000
   Unique idx: [9]
   Num of rows per partition: 10000
   Unique idx: [10]
   Num of rows per partition: 10000
   Unique idx: [10]
                                                                                   
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Unique idx: [1]
   Num of rows per partition: 20000
   Unique idx: [2]
   Num of rows per partition: 20000
   Unique idx: [3]
   Num of rows per partition: 20000
   Unique idx: [4]
   Num of rows per partition: 20000
   Unique idx: [5]
   Num of rows per partition: 20000
   Unique idx: [6]
   Num of rows per partition: 20000
   Unique idx: [7]
   Num of rows per partition: 20000
   Unique idx: [8]
   Num of rows per partition: 20000
   Unique idx: [9]
   Num of rows per partition: 20000
   Unique idx: [10]
   
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Unique idx: [1 2]
   Num of rows per partition: 30000
   Unique idx: [2 3]
   Num of rows per partition: 30000
   Unique idx: [4 5]
   Num of rows per partition: 30000
   Unique idx: [5 6]
   Num of rows per partition: 30000
   Unique idx: [7 8]
   Num of rows per partition: 30000
   Unique idx: [8 9]
   Num of rows per partition: 20000
   Unique idx: [10]
   ```
   
   Adding randomness to the groups makes it clear
   
   ```python
   import random
   
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           if random.choice([True, False]):
               data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           print(f"Unique idx: {i.to_pandas().idx.unique()}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   def udf2(df):
       print(f"Num of rows per group: {len(df)}")
       print(f"Unique idx: {df.idx.unique()}")
       return df
   
   print("\n")
   print("Using applyInPandas")
   df.groupby("idx").applyInPandas(udf2, schema=df.schema).write.format("noop").mode("overwrite").save()
   ```
   
   Output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000
   Unique idx: [1 2]
   Num of rows per partition: 10000
   Unique idx: [2 3]
   Num of rows per partition: 10000
   Unique idx: [3 4]
   Num of rows per partition: 10000
   Unique idx: [4 5]
   Num of rows per partition: 10000
   Unique idx: [5 6]
   Num of rows per partition: 10000
   Unique idx: [6 7]
   Num of rows per partition: 10000
   Unique idx: [7 8]
   Num of rows per partition: 10000
   Unique idx: [8 9]
   Num of rows per partition: 10000
   Unique idx: [ 9 10]
   Num of rows per partition: 9982
   Unique idx: [10]
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Unique idx: [1 2 3]
   Num of rows per partition: 20000
   Unique idx: [3 4 5]
   Num of rows per partition: 20000
   Unique idx: [5 6 7]
   Num of rows per partition: 20000
   Unique idx: [7 8 9]
   Num of rows per partition: 19982
   Unique idx: [ 9 10]
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Unique idx: [1 2 3 4]
   Num of rows per partition: 30000
   Unique idx: [4 5 6 7]
   Num of rows per partition: 30000
   Unique idx: [ 7  8  9 10]
   Num of rows per partition: 9982
   Unique idx: [10]
   
   Using applyInPandas
   Num of rows per group: 9972
   Unique idx: [1]
   Num of rows per group: 9978
   Unique idx: [2]
   Num of rows per group: 9953
   Unique idx: [3]
   Num of rows per group: 10004
   Unique idx: [4]
   Num of rows per group: 9956
   Unique idx: [5]
   Num of rows per group: 10036
   Unique idx: [6]
   Num of rows per group: 10037
   Unique idx: [7]
   Num of rows per group: 9898
   Unique idx: [8]
   Num of rows per group: 10011
   Unique idx: [9]
   Num of rows per group: 10137
   Unique idx: [10]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1687611468

   Once we want them to be fully supported properly as a user-facing API, we probably should think about Arrow versions of https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs and https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#pandas-function-apis. That's what I meant by too many API.
   
   Yes, we marked it as a developer API and that `mapInArrow` can cover most of direct Arrow usecases. That's why I suggest to use that for a workaround such as `repartition(grouping_cols).mapInArrow`. Only exception that `mapInArrow` can't cover cogrouping. In this case, you can use somewhat slower `mapInPandas`. And this and only this is the benefit Arrow versions would bring.
   
   To be extra clear,
   - Once we add one, we should also think about all other variants. From what I understood, your argument applies to the all variants.
    - The only benefit of adding this API are: direct Arrow usage with cogrouping (in which you can work around with `mapInPandas`)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685483237

   qq, can't we workaround by `df.repartitionByExpression().mapInArrow()` for `groupby` case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] goodwanghan commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "goodwanghan (via GitHub)" <gi...@apache.org>.
goodwanghan commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685556987

   > qq, can't we workaround by `df.repartitionByExpression().mapInArrow()` for `groupby` case?
   
   Hi @HyukjinKwon i understand what you mean, I am curious if df.repartition will also work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by GitBox <gi...@apache.org>.
EnricoMi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1335369584

   @HyukjinKwon what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Kimahriman commented on a diff in pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1148363009


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -341,6 +343,132 @@ def applyInPandasWithState(
         )
         return DataFrame(jdf, self.session)
 
+    def applyInArrow(
+        self, func: "ArrowGroupedMapFunction", schema: Union[StructType, str]
+    ) -> "DataFrame":
+        """
+        Maps each group of the current :class:`DataFrame` using an Arrow udf and returns the result
+        as a `DataFrame`.
+
+        The function should take a `pyarrow.Table` and return another
+        `pyarrow.Table`. Alternatively, the user can pass a function that takes
+        a tuple of `pyarrow.Scalar` grouping key(s) and a `pyarrow.Table`.
+        For each group, all columns are passed together as a `pyarrow.Table`
+        to the user-function and the returned `pyarrow.Table` are combined as a
+        :class:`DataFrame`.
+
+        The `schema` should be a :class:`StructType` describing the schema of the returned
+        `pyarrow.Table`. The column labels of the returned `pyarrow.Table` must either match
+        the field names in the defined schema if specified as strings, or match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pandas.DataFrame` can be arbitrary.

Review Comment:
   ```suggestion
           The length of the returned `pyarrow.Table` can be arbitrary.
   ```



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -147,6 +147,49 @@ def wrap_and_init_stream():
         return super(ArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream)
 
 
+class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializes pyarrow.RecordBatch data with Arrow streaming format.
+
+    Loads Arrow record batches as `[[pa.RecordBatch]]` (one `[pa.RecordBatch]` per group)
+    and serializes `[([pa.RecordBatch], arrow_type)]`.
+
+    Parameters
+    ----------
+    assign_cols_by_name : bool
+        If True, then DataFrames will get columns by name
+    """
+
+    def __init__(self, assign_cols_by_name):
+        super(ArrowStreamGroupUDFSerializer, self).__init__()
+        self._assign_cols_by_name = assign_cols_by_name
+
+    def dump_stream(self, iterator, stream):
+        import pyarrow as pa
+
+        # flatten inner list [([pa.RecordBatch], arrow_type)] into [(pa.RecordBatch, arrow_type)]
+        # so strip off inner iterator induced by ArrowStreamUDFSerializer.load_stream
+        batch_iter = [
+            (batch, arrow_type)
+            for batches, arrow_type in iterator  # tuple constructed in wrap_grouped_map_arrow_udf
+            for batch in batches
+        ]
+
+        if self._assign_cols_by_name:
+            batch_iter = [
+                (
+                    pa.RecordBatch.from_arrays(
+                        [batch.column(field.name) for field in arrow_type],
+                        names=[field.name for field in arrow_type],
+                    ),
+                    arrow_type,
+                )
+                for batch, arrow_type in batch_iter
+            ]

Review Comment:
   Are these list comprehensions going to materialize the entire result set before actually sending anything back to the JVM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1731949210

   @HyukjinKwon since @igorghi has shown with his tests it's not possible to use repartition().mapInArrow to mimic groupbyApply, would it now make sense to add groupbyApplyInArrow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1375549715


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -354,6 +356,132 @@ def applyInPandasWithState(
         )
         return DataFrame(jdf, self.session)
 
+    def applyInArrow(
+        self, func: "ArrowGroupedMapFunction", schema: Union[StructType, str]
+    ) -> "DataFrame":
+        """
+        Maps each group of the current :class:`DataFrame` using an Arrow udf and returns the result
+        as a `DataFrame`.
+
+        The function should take a `pyarrow.Table` and return another
+        `pyarrow.Table`. Alternatively, the user can pass a function that takes
+        a tuple of `pyarrow.Scalar` grouping key(s) and a `pyarrow.Table`.
+        For each group, all columns are passed together as a `pyarrow.Table`
+        to the user-function and the returned `pyarrow.Table` are combined as a
+        :class:`DataFrame`.
+
+        The `schema` should be a :class:`StructType` describing the schema of the returned
+        `pyarrow.Table`. The column labels of the returned `pyarrow.Table` must either match
+        the field names in the defined schema if specified as strings, or match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pyarrow.Table` can be arbitrary.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        func : function
+            a Python native function that takes a `pyarrow.Table` and outputs a
+            `pyarrow.Table`, or that takes one tuple (grouping keys) and a
+            `pyarrow.Table` and outputs a `pyarrow.Table`.
+        schema : :class:`pyspark.sql.types.DataType` or str
+            the return type of the `func` in PySpark. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+
+        Examples
+        --------
+        >>> from pyspark.sql.functions import ceil
+        >>> import pyarrow  # doctest: +SKIP
+        >>> import pyarrow.compute as pc  # doctest: +SKIP
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ...     ("id", "v"))  # doctest: +SKIP
+        >>> def normalize(table):
+        ...     v = table.column("v")
+        ...     norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+        ...     return table.set_column(1, "v", norm)
+        >>> df.groupby("id").applyInArrow(
+        ...     normalize, schema="id long, v double").show()  # doctest: +SKIP
+        +---+-------------------+
+        +---+-------------------+
+        | id|                  v|
+        +---+-------------------+
+        |  1|-0.7071067811865475|
+        |  1| 0.7071067811865475|
+        |  2|-0.8320502943378437|
+        |  2|-0.2773500981126146|
+        |  2| 1.1094003924504583|
+        +---+-------------------+
+
+        Alternatively, the user can pass a function that takes two arguments.
+        In this case, the grouping key(s) will be passed as the first argument and the data will
+        be passed as the second argument. The grouping key(s) will be passed as a tuple of Arrow
+        scalars types, e.g., `pyarrow.Int32Scalar` and `pyarrow.FloatScalar`. The data will still
+        be passed in as a `pyarrow.Table` containing all columns from the original Spark DataFrame.
+        This is useful when the user does not want to hardcode grouping key(s) in the function.

Review Comment:
   Should add a newline here; otherwise, the doc would be malformed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ion-elgreco commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685039400

   @dongjoon-hyun @zhengruifeng @allisonwang-db @xinrong-meng @HyukjinKwon
   Are there any updates on this PR? This would be a very useful feature for scaling other data frame libraries that use arrow with spark.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1685712780

   > And Pandas has some quirks with upcasting, not respecting the original datatypes, null handling, and like you say nested dtypes.
   
   PyArrow itself also has its own type coercion. It's just a matter of ruling the type coercion so I don't buy this argument.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1811676307

   cc @ueshin and @xinrong-meng for review if you find some time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1398218847


##########
python/pyspark/sql/udf.py:
##########
@@ -295,6 +311,20 @@ def returnType(self) -> DataType:
                         "return_type": str(self._returnType_placeholder),
                     },
                 )
+        elif self.evalType == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_type(self._returnType_placeholder)
+                except TypeError:
+                    raise NotImplementedError(

Review Comment:
   Shall we raise `PySparkNotImplementedError` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #38624:
URL: https://github.com/apache/spark/pull/38624#discussion_r1398218743


##########
python/pyspark/sql/udf.py:
##########
@@ -275,6 +275,22 @@ def returnType(self) -> DataType:
                         "return_type": str(self._returnType_placeholder),
                     },
                 )
+        elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_type(self._returnType_placeholder)
+                except TypeError:

Review Comment:
   Shall we raise `PySparkNotImplementedError` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup
URL: https://github.com/apache/spark/pull/38624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1837692882

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1841515252

   @HyukjinKwon which spark release will we see this feature? :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]

Posted by "ion-elgreco (via GitHub)" <gi...@apache.org>.
ion-elgreco commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1841527153

   @EnricoMi do you know by any chance when that is targeted for?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org