You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "itholic (via GitHub)" <gi...@apache.org> on 2023/09/04 08:58:46 UTC

[GitHub] [spark] itholic opened a new pull request, #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

itholic opened a new pull request, #42798:
URL: https://github.com/apache/spark/pull/42798

   
   
   ### What changes were proposed in this pull request?
   
   This PR proposes to support string type columns for `DataFrameGroupBy.sum`.
   
   
   ### Why are the changes needed?
   
   To match the behavior with latest pandas.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, from now on the `DataFrameGroupBy.sum` follows the behavior of latest pandas as below:
   
   **Test DataFrame**
   ```python
   >>> psdf
      A    B  C      D
   0  1  3.1  a   True
   1  2  4.1  b  False
   2  1  4.1  b  False
   3  2  3.1  a   True
   ```
   
   **Before**
   ```python
   >>> psdf.groupby("A").sum().sort_index()
        B  D
   A
   1  7.2  1
   2  7.2  1
   ```
   
   **After**
   ```python
   >>> psdf.groupby("A").sum().sort_index()
        B   C  D
   A
   1  7.2  ab  1
   2  7.2  ba  1
   ```
   
   ### How was this patch tested?
   
   Updated the existing UTs to support string type columns.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1316539867


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   Just adjusted the comments. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1316574489


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3537,14 +3537,14 @@ def _reduce_for_stat_function(
                 if sfun.__name__ == "sum" and isinstance(
                     psdf._internal.spark_type_for(label), StringType
                 ):
+                    input_scol_name = psser._internal.data_spark_column_names[0]
                     # Sort data with natural order column to ensure order of data
-                    sorted_array = F.array_sort(
-                        F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
+                    output_scol = F.concat_ws(
+                        "",
+                        F.array_sort(
+                            F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
+                        ).getField(input_scol_name),

Review Comment:
   I think you will need `F.transform` to extract the strings.
   
   Other wise, you can use `F.reduce` to directly concate the strings from structs [<long, string>]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #42798:
URL: https://github.com/apache/spark/pull/42798#issuecomment-1713172475

   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1316586840


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3537,14 +3537,14 @@ def _reduce_for_stat_function(
                 if sfun.__name__ == "sum" and isinstance(
                     psdf._internal.spark_type_for(label), StringType
                 ):
+                    input_scol_name = psser._internal.data_spark_column_names[0]
                     # Sort data with natural order column to ensure order of data
-                    sorted_array = F.array_sort(
-                        F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
+                    output_scol = F.concat_ws(
+                        "",
+                        F.array_sort(
+                            F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
+                        ).getField(input_scol_name),

Review Comment:
   Sounds good. Updated the code with `transform`. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1316572356


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   yes, should extract the string col



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #42798:
URL: https://github.com/apache/spark/pull/42798#issuecomment-1705020773

   @itholic I suspect the behavior is not deterministic: it depends on the internal order of `collect_list`
   
   To make it deterministic: I think we need to collect_list both value and index, and sort by the indices before `concat_ws`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1315317140


##########
python/pyspark/pandas/groupby.py:
##########
@@ -910,7 +910,7 @@ def sum(self, numeric_only: Optional[bool] = True, min_count: int = 0) -> FrameL
 

Review Comment:
   I think you gotta fix the log above too since not we support strings too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1315318972


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   can we sort by natural order? we have `compute.ordered_head` config. We can sort it by natural order, and perform `collect_list`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1315417026


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   since string columns are computed together with numerical ones, I think we have to compute strings' sum in an aggregation way:
   
   ```
   F.concat_ws("", F.array_sort(
     F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
   )
   ```
   
   For struct type, array_sort sort elements by first field then second field, IIRC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1314651862


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   We should use combination of `concat_ws` and `collect_list` instead of `sum` to match the behavior with Pandas for string summation as below:
   
   ```python
   >>> import pyspark.sql.functions as sf
   >>> sdf.show()
   +---+
   |  A|
   +---+
   |  a|
   |  b|
   |  c|
   +---+
   
   # Using `sum` over string type column returns `NULL` which is not matched with pandas.
   >>> sdf.select(sf.sum(sdf.A)).show()
   +------+
   |sum(A)|
   +------+
   |  NULL|
   +------+
   
   # Using combination of `concat_ws` and `collect_list` to match the pandas behavior
   >>> sdf.select(sf.concat_ws("", sf.collect_list(sdf.A))).show()
   +----------------------------+
   |concat_ws(, collect_list(A))|
   +----------------------------+
   |                         abc|
   +----------------------------+
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1315352479


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   I think maybe this is different case from `head`?
   
   In head, we do `sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)` and compute the `sdf.limit(n)`, so that we can keep the order because `DataFrame.limit` doesn't shuffle the data.
   
   But in this case, it shuffles the data again when computing the `collect_list` even after sorting the DataFrame by natural order in advance, so I think the order would not be guaranteed.
   
   Please let me know if I missed something??



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1315344811


##########
python/pyspark/pandas/groupby.py:
##########
@@ -910,7 +910,7 @@ def sum(self, numeric_only: Optional[bool] = True, min_count: int = 0) -> FrameL
 

Review Comment:
   Yeah, we should update. Thanks for catching this out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42798:
URL: https://github.com/apache/spark/pull/42798#discussion_r1316539576


##########
python/pyspark/pandas/groupby.py:
##########
@@ -3534,7 +3534,12 @@ def _reduce_for_stat_function(
             for label in psdf._internal.column_labels:
                 psser = psdf._psser_for(label)
                 input_scol = psser._dtype_op.nan_to_null(psser).spark.column
-                output_scol = sfun(input_scol)
+                if sfun.__name__ == "sum" and isinstance(
+                    psdf._internal.spark_type_for(label), StringType
+                ):
+                    output_scol = F.concat_ws("", F.collect_list(input_scol))

Review Comment:
   Yeah, then maybe we should extract the only string column from nested arrays to pass as arguments to `concat_ws`?
   
   ```
   F.concat_ws(
       "",
       F.array_sort(
           F.collect_list(F.struct(NATURAL_ORDER_COLUMN_NAME, input_scol))
       ).getField(input_scol_name),
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on PR #42798:
URL: https://github.com/apache/spark/pull/42798#issuecomment-1705851281

   @zhengruifeng I think the problem is that the Pandas compute the concat without sorting, so the result can be difficult when the index is not sorted as below:
   ## Problem
   
   **Pandas**
   ```python
   >>> pdf
      A  B
   4  a  1
   3  b  2
   2  c  3
   >>> pdf.sum()
   A    abc
   B      6
   dtype: object
   ```
   
   **Pandas API on Spark**
   ```python
   >>> psdf
      A  B
   4  a  1
   3  b  2
   2  c  3
   >>> psdf.sum()
   A    cba  # we internally sorted the index, so the result is different from Pandas
   B      6
   dtype: object
   ```
   
   ## Solution
   I think for now we can pick the one of three ways below:
   1. We can document the warning note as below:
       ```
       The result for string type column is non-deterministic since the implementation depends on `collect_list` API from PySpark which is non-deterministic as well.
       ```
   2. We can `collect_list` both value and index, and sort by the indices before `concat_ws` as you suggested, and document the warning note as below:
       ```
       The result for string type column can be different from Pandas when the index is not sorted, since we always sort the indexes before computing since the implementation depends on `collect_list` API from PySpark which is non-deterministic.
       ```
   3. We don't support the string type column like so far, and add a note that why we don't support the string type column as below:
       ```
       String type column is not support for now, because it might yield non-deterministic results unlike in Pandas.
       ```
   
   WDYT? Also cc @HyukjinKwon, @ueshin @xinrong-meng , What strategy do we take for this situation? I believe that the same rules should apply to similar cases that already exist or may arise in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #42798: [SPARK-43295][PS] Support string type columns for `DataFrameGroupBy.sum`
URL: https://github.com/apache/spark/pull/42798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org