You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/27 00:23:47 UTC

[spark] branch branch-3.4 updated: [SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d4376e0cd11 [SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names
d4376e0cd11 is described below

commit d4376e0cd11f24e12628349d21f2ce84ff2aa12a
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Mon Feb 27 09:23:23 2023 +0900

    [SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names
    
    ### What changes were proposed in this pull request?
    
    Fixes `DataFrame.toPandas` to handle duplicated column names.
    
    ### Why are the changes needed?
    
    Currently
    
    ```py
    spark.sql("select 1 v, 1 v").toPandas()
    ```
    
    fails with the error:
    
    ```py
    Traceback (most recent call last):
    ...
      File ".../python/pyspark/sql/connect/dataframe.py", line 1335, in toPandas
        return self._session.client.to_pandas(query)
      File ".../python/pyspark/sql/connect/client.py", line 548, in to_pandas
        pdf = table.to_pandas()
      File "pyarrow/array.pxi", line 830, in pyarrow.lib._PandasConvertible.to_pandas
      File "pyarrow/table.pxi", line 3908, in pyarrow.lib.Table._to_pandas
      File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 819, in table_to_blockmanager
        columns = _deserialize_column_index(table, all_columns, column_indexes)
      File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 938, in _deserialize_column_index
        columns = _flatten_single_level_multiindex(columns)
      File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 1186, in _flatten_single_level_multiindex
        raise ValueError('Found non-unique column index')
    ValueError: Found non-unique column index
    ```
    
    Simliar to #28210.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Duplicated column names will be available when calling `toPandas()`.
    
    ### How was this patch tested?
    
    Enabled related tests.
    
    Closes #40170 from ueshin/issues/SPARK-42574/toPandas.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 89cf490f12937eaac0bb04f6cf227294776557b4)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/connect/client.py               |  3 ++
 .../sql/tests/connect/test_parity_dataframe.py     |  8 ++---
 python/pyspark/sql/tests/test_dataframe.py         | 40 ++++++++++++----------
 3 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index d6a1df6ba93..91d2f96aee1 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -545,7 +545,10 @@ class SparkConnectClient(object):
         req = self._execute_plan_request_with_metadata()
         req.plan.CopyFrom(plan)
         table, metrics = self._execute_and_fetch(req)
+        column_names = table.column_names
+        table = table.rename_columns([f"col_{i}" for i in range(len(column_names))])
         pdf = table.to_pandas()
+        pdf.columns = column_names
         if len(metrics) > 0:
             pdf.attrs["metrics"] = metrics
         return pdf
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 800fe4a2298..97c0f473ce8 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -112,15 +112,11 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase):
     def test_to_pandas_from_null_dataframe(self):
         super().test_to_pandas_from_null_dataframe()
 
-    # TODO(SPARK-41834): Implement SparkSession.conf
-    @unittest.skip("Fails in Spark Connect, should enable.")
     def test_to_pandas_on_cross_join(self):
-        super().test_to_pandas_on_cross_join()
+        self.check_to_pandas_on_cross_join()
 
-    # TODO(SPARK-41834): Implement SparkSession.conf
-    @unittest.skip("Fails in Spark Connect, should enable.")
     def test_to_pandas_with_duplicated_column_names(self):
-        super().test_to_pandas_with_duplicated_column_names()
+        self.check_to_pandas_with_duplicated_column_names()
 
     # TODO(SPARK-42367): DataFrame.drop should handle duplicated columns properly
     @unittest.skip("Fails in Spark Connect, should enable.")
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index 610edc0926d..e686fa9e929 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1129,19 +1129,27 @@ class DataFrameTestsMixin:
 
     @unittest.skipIf(not have_pandas, pandas_requirement_message)  # type: ignore
     def test_to_pandas_with_duplicated_column_names(self):
+        for arrow_enabled in [False, True]:
+            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+                self.check_to_pandas_with_duplicated_column_names()
+
+    def check_to_pandas_with_duplicated_column_names(self):
         import numpy as np
 
         sql = "select 1 v, 1 v"
-        for arrowEnabled in [False, True]:
-            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
-                df = self.spark.sql(sql)
-                pdf = df.toPandas()
-                types = pdf.dtypes
-                self.assertEqual(types.iloc[0], np.int32)
-                self.assertEqual(types.iloc[1], np.int32)
+        df = self.spark.sql(sql)
+        pdf = df.toPandas()
+        types = pdf.dtypes
+        self.assertEqual(types.iloc[0], np.int32)
+        self.assertEqual(types.iloc[1], np.int32)
 
     @unittest.skipIf(not have_pandas, pandas_requirement_message)  # type: ignore
     def test_to_pandas_on_cross_join(self):
+        for arrow_enabled in [False, True]:
+            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+                self.check_to_pandas_on_cross_join()
+
+    def check_to_pandas_on_cross_join(self):
         import numpy as np
 
         sql = """
@@ -1151,18 +1159,12 @@ class DataFrameTestsMixin:
           select explode(sequence(1, 3)) v
         ) t2
         """
-        for arrowEnabled in [False, True]:
-            with self.sql_conf(
-                {
-                    "spark.sql.crossJoin.enabled": True,
-                    "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled,
-                }
-            ):
-                df = self.spark.sql(sql)
-                pdf = df.toPandas()
-                types = pdf.dtypes
-                self.assertEqual(types.iloc[0], np.int32)
-                self.assertEqual(types.iloc[1], np.int32)
+        with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+            df = self.spark.sql(sql)
+            pdf = df.toPandas()
+            types = pdf.dtypes
+            self.assertEqual(types.iloc[0], np.int32)
+            self.assertEqual(types.iloc[1], np.int32)
 
     @unittest.skipIf(have_pandas, "Required Pandas was found.")
     def test_to_pandas_required_pandas_not_found(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org