You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/27 00:23:47 UTC
[spark] branch branch-3.4 updated: [SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new d4376e0cd11 [SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names
d4376e0cd11 is described below
commit d4376e0cd11f24e12628349d21f2ce84ff2aa12a
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Mon Feb 27 09:23:23 2023 +0900
[SPARK-42574][CONNECT][PYTHON] Fix toPandas to handle duplicated column names
### What changes were proposed in this pull request?
Fixes `DataFrame.toPandas` to handle duplicated column names.
### Why are the changes needed?
Currently
```py
spark.sql("select 1 v, 1 v").toPandas()
```
fails with the error:
```py
Traceback (most recent call last):
...
File ".../python/pyspark/sql/connect/dataframe.py", line 1335, in toPandas
return self._session.client.to_pandas(query)
File ".../python/pyspark/sql/connect/client.py", line 548, in to_pandas
pdf = table.to_pandas()
File "pyarrow/array.pxi", line 830, in pyarrow.lib._PandasConvertible.to_pandas
File "pyarrow/table.pxi", line 3908, in pyarrow.lib.Table._to_pandas
File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 819, in table_to_blockmanager
columns = _deserialize_column_index(table, all_columns, column_indexes)
File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 938, in _deserialize_column_index
columns = _flatten_single_level_multiindex(columns)
File "/.../lib/python3.9/site-packages/pyarrow/pandas_compat.py", line 1186, in _flatten_single_level_multiindex
raise ValueError('Found non-unique column index')
ValueError: Found non-unique column index
```
Simliar to #28210.
### Does this PR introduce _any_ user-facing change?
Duplicated column names will be available when calling `toPandas()`.
### How was this patch tested?
Enabled related tests.
Closes #40170 from ueshin/issues/SPARK-42574/toPandas.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 89cf490f12937eaac0bb04f6cf227294776557b4)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/connect/client.py | 3 ++
.../sql/tests/connect/test_parity_dataframe.py | 8 ++---
python/pyspark/sql/tests/test_dataframe.py | 40 ++++++++++++----------
3 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index d6a1df6ba93..91d2f96aee1 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -545,7 +545,10 @@ class SparkConnectClient(object):
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
table, metrics = self._execute_and_fetch(req)
+ column_names = table.column_names
+ table = table.rename_columns([f"col_{i}" for i in range(len(column_names))])
pdf = table.to_pandas()
+ pdf.columns = column_names
if len(metrics) > 0:
pdf.attrs["metrics"] = metrics
return pdf
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 800fe4a2298..97c0f473ce8 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -112,15 +112,11 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase):
def test_to_pandas_from_null_dataframe(self):
super().test_to_pandas_from_null_dataframe()
- # TODO(SPARK-41834): Implement SparkSession.conf
- @unittest.skip("Fails in Spark Connect, should enable.")
def test_to_pandas_on_cross_join(self):
- super().test_to_pandas_on_cross_join()
+ self.check_to_pandas_on_cross_join()
- # TODO(SPARK-41834): Implement SparkSession.conf
- @unittest.skip("Fails in Spark Connect, should enable.")
def test_to_pandas_with_duplicated_column_names(self):
- super().test_to_pandas_with_duplicated_column_names()
+ self.check_to_pandas_with_duplicated_column_names()
# TODO(SPARK-42367): DataFrame.drop should handle duplicated columns properly
@unittest.skip("Fails in Spark Connect, should enable.")
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index 610edc0926d..e686fa9e929 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1129,19 +1129,27 @@ class DataFrameTestsMixin:
@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore
def test_to_pandas_with_duplicated_column_names(self):
+ for arrow_enabled in [False, True]:
+ with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+ self.check_to_pandas_with_duplicated_column_names()
+
+ def check_to_pandas_with_duplicated_column_names(self):
import numpy as np
sql = "select 1 v, 1 v"
- for arrowEnabled in [False, True]:
- with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
- df = self.spark.sql(sql)
- pdf = df.toPandas()
- types = pdf.dtypes
- self.assertEqual(types.iloc[0], np.int32)
- self.assertEqual(types.iloc[1], np.int32)
+ df = self.spark.sql(sql)
+ pdf = df.toPandas()
+ types = pdf.dtypes
+ self.assertEqual(types.iloc[0], np.int32)
+ self.assertEqual(types.iloc[1], np.int32)
@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore
def test_to_pandas_on_cross_join(self):
+ for arrow_enabled in [False, True]:
+ with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
+ self.check_to_pandas_on_cross_join()
+
+ def check_to_pandas_on_cross_join(self):
import numpy as np
sql = """
@@ -1151,18 +1159,12 @@ class DataFrameTestsMixin:
select explode(sequence(1, 3)) v
) t2
"""
- for arrowEnabled in [False, True]:
- with self.sql_conf(
- {
- "spark.sql.crossJoin.enabled": True,
- "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled,
- }
- ):
- df = self.spark.sql(sql)
- pdf = df.toPandas()
- types = pdf.dtypes
- self.assertEqual(types.iloc[0], np.int32)
- self.assertEqual(types.iloc[1], np.int32)
+ with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+ df = self.spark.sql(sql)
+ pdf = df.toPandas()
+ types = pdf.dtypes
+ self.assertEqual(types.iloc[0], np.int32)
+ self.assertEqual(types.iloc[1], np.int32)
@unittest.skipIf(have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org