You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/14 05:11:01 UTC
[spark] branch branch-3.0 updated: [SPARK-31441] Support duplicated
column names for toPandas with arrow execution
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f57f7a7 [SPARK-31441] Support duplicated column names for toPandas with arrow execution
f57f7a7 is described below
commit f57f7a766d30854aaba72068dcfbee7acfa50cc5
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Tue Apr 14 14:08:56 2020 +0900
[SPARK-31441] Support duplicated column names for toPandas with arrow execution
### What changes were proposed in this pull request?
This PR is adding support duplicated column names for `toPandas` with Arrow execution.
### Why are the changes needed?
When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates.
```py
>>> spark.sql("select 1 v, 1 v").toPandas()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas
pdf = table.to_pandas()
File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas
File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas
File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager
columns = _deserialize_column_index(table, all_columns, column_indexes)
File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index
columns = _flatten_single_level_multiindex(columns)
File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex
raise ValueError('Found non-unique column index')
ValueError: Found non-unique column index
```
### Does this PR introduce any user-facing change?
Yes, previously we will face an error above, but after this PR, we will see the result:
```py
>>> spark.sql("select 1 v, 1 v").toPandas()
v v
0 1 1
```
### How was this patch tested?
Added and modified related tests.
Closes #28210 from ueshin/issues/SPARK-31441/to_pandas.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 87be3641eb3517862dd5073903d5b37275852066)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
python/pyspark/sql/pandas/conversion.py | 6 +++++-
python/pyspark/sql/tests/test_dataframe.py | 27 +++++++++++++++++++++------
2 files changed, 26 insertions(+), 7 deletions(-)
diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index 47cf8bb..251625a 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -103,13 +103,17 @@ class PandasConversionMixin(object):
try:
from pyspark.sql.pandas.types import _check_series_localize_timestamps
import pyarrow
- batches = self._collect_as_arrow()
+ # Rename columns to avoid duplicated column names.
+ tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
+ batches = self.toDF(*tmp_column_names)._collect_as_arrow()
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pdf = table.to_pandas(date_as_object=True)
+ # Rename back to the original column names.
+ pdf.columns = self.columns
for field in self.schema:
if isinstance(field.dataType, TimestampType):
pdf[field.name] = \
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index d9dcbc0..738c984 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -530,6 +530,19 @@ class DataFrameTests(ReusedSQLTestCase):
self.assertEquals(types[5], 'datetime64[ns]')
@unittest.skipIf(not have_pandas, pandas_requirement_message)
+ def test_to_pandas_with_duplicated_column_names(self):
+ import numpy as np
+
+ sql = "select 1 v, 1 v"
+ for arrowEnabled in [False, True]:
+ with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
+ df = self.spark.sql(sql)
+ pdf = df.toPandas()
+ types = pdf.dtypes
+ self.assertEquals(types.iloc[0], np.int32)
+ self.assertEquals(types.iloc[1], np.int32)
+
+ @unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_on_cross_join(self):
import numpy as np
@@ -540,12 +553,14 @@ class DataFrameTests(ReusedSQLTestCase):
select explode(sequence(1, 3)) v
) t2
"""
- with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
- df = self.spark.sql(sql)
- pdf = df.toPandas()
- types = pdf.dtypes
- self.assertEquals(types.iloc[0], np.int32)
- self.assertEquals(types.iloc[1], np.int32)
+ for arrowEnabled in [False, True]:
+ with self.sql_conf({"spark.sql.crossJoin.enabled": True,
+ "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
+ df = self.spark.sql(sql)
+ pdf = df.toPandas()
+ types = pdf.dtypes
+ self.assertEquals(types.iloc[0], np.int32)
+ self.assertEquals(types.iloc[1], np.int32)
@unittest.skipIf(have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org