You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/14 05:11:01 UTC

[spark] branch branch-3.0 updated: [SPARK-31441] Support duplicated column names for toPandas with arrow execution

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f57f7a7  [SPARK-31441] Support duplicated column names for toPandas with arrow execution
f57f7a7 is described below

commit f57f7a766d30854aaba72068dcfbee7acfa50cc5
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Tue Apr 14 14:08:56 2020 +0900

    [SPARK-31441] Support duplicated column names for toPandas with arrow execution
    
    ### What changes were proposed in this pull request?
    
    This PR is adding support duplicated column names for `toPandas` with Arrow execution.
    
    ### Why are the changes needed?
    
    When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates.
    
    ```py
    >>> spark.sql("select 1 v, 1 v").toPandas()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas
        pdf = table.to_pandas()
      File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas
      File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas
      File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager
        columns = _deserialize_column_index(table, all_columns, column_indexes)
      File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index
        columns = _flatten_single_level_multiindex(columns)
      File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex
        raise ValueError('Found non-unique column index')
    ValueError: Found non-unique column index
    ```
    
    ### Does this PR introduce any user-facing change?
    
    Yes, previously we will face an error above, but after this PR, we will see the result:
    
    ```py
    >>> spark.sql("select 1 v, 1 v").toPandas()
       v  v
    0  1  1
    ```
    
    ### How was this patch tested?
    
    Added and modified related tests.
    
    Closes #28210 from ueshin/issues/SPARK-31441/to_pandas.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 87be3641eb3517862dd5073903d5b37275852066)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/pandas/conversion.py    |  6 +++++-
 python/pyspark/sql/tests/test_dataframe.py | 27 +++++++++++++++++++++------
 2 files changed, 26 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index 47cf8bb..251625a 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -103,13 +103,17 @@ class PandasConversionMixin(object):
                 try:
                     from pyspark.sql.pandas.types import _check_series_localize_timestamps
                     import pyarrow
-                    batches = self._collect_as_arrow()
+                    # Rename columns to avoid duplicated column names.
+                    tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
+                    batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
                         pdf = table.to_pandas(date_as_object=True)
+                        # Rename back to the original column names.
+                        pdf.columns = self.columns
                         for field in self.schema:
                             if isinstance(field.dataType, TimestampType):
                                 pdf[field.name] = \
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index d9dcbc0..738c984 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -530,6 +530,19 @@ class DataFrameTests(ReusedSQLTestCase):
         self.assertEquals(types[5], 'datetime64[ns]')
 
     @unittest.skipIf(not have_pandas, pandas_requirement_message)
+    def test_to_pandas_with_duplicated_column_names(self):
+        import numpy as np
+
+        sql = "select 1 v, 1 v"
+        for arrowEnabled in [False, True]:
+            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
+                df = self.spark.sql(sql)
+                pdf = df.toPandas()
+                types = pdf.dtypes
+                self.assertEquals(types.iloc[0], np.int32)
+                self.assertEquals(types.iloc[1], np.int32)
+
+    @unittest.skipIf(not have_pandas, pandas_requirement_message)
     def test_to_pandas_on_cross_join(self):
         import numpy as np
 
@@ -540,12 +553,14 @@ class DataFrameTests(ReusedSQLTestCase):
           select explode(sequence(1, 3)) v
         ) t2
         """
-        with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
-            df = self.spark.sql(sql)
-            pdf = df.toPandas()
-            types = pdf.dtypes
-            self.assertEquals(types.iloc[0], np.int32)
-            self.assertEquals(types.iloc[1], np.int32)
+        for arrowEnabled in [False, True]:
+            with self.sql_conf({"spark.sql.crossJoin.enabled": True,
+                                "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
+                df = self.spark.sql(sql)
+                pdf = df.toPandas()
+                types = pdf.dtypes
+                self.assertEquals(types.iloc[0], np.int32)
+                self.assertEquals(types.iloc[1], np.int32)
 
     @unittest.skipIf(have_pandas, "Required Pandas was found.")
     def test_to_pandas_required_pandas_not_found(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org