You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/04/28 00:20:35 UTC

[spark] branch branch-3.3 updated: [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 84addc5d1d8 [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`
84addc5d1d8 is described below

commit 84addc5d1d8359a5b716ec869489fc961af23cf2
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Thu Apr 28 09:17:24 2022 +0900

    [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`
    
    Minor refactoring of `python/pyspark/sql/pandas/conversion.py`, which includes:
    - doc change
    - renaming
    
    To improve code readability and maintainability.
    
    No.
    
    Existing tests.
    
    Closes #36384 from xinrong-databricks/conversion.py.
    
    Authored-by: Xinrong Meng <xi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit c19fadabde3ef3f9c7e4fa9bf74632a4f8e1f3e2)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/pandas/conversion.py | 52 ++++++++++++++++-----------------
 1 file changed, 25 insertions(+), 27 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index 7153450d2bc..808444f1e2e 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -51,7 +51,7 @@ if TYPE_CHECKING:
 
 class PandasConversionMixin:
     """
-    Min-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame`
+    Mix-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame`
     can use this class.
     """
 
@@ -65,10 +65,10 @@ class PandasConversionMixin:
 
         Notes
         -----
-        This method should only be used if the resulting Pandas's :class:`DataFrame` is
+        This method should only be used if the resulting Pandas ``pandas.DataFrame`` is
         expected to be small, as all the data is loaded into the driver's memory.
 
-        Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental.
+        Usage with ``spark.sql.execution.arrow.pyspark.enabled=True`` is experimental.
 
         Examples
         --------
@@ -136,8 +136,7 @@ class PandasConversionMixin:
 
                     # Rename columns to avoid duplicated column names.
                     tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
-                    c = self.sparkSession._jconf
-                    self_destruct = c.arrowPySparkSelfDestructEnabled()
+                    self_destruct = jconf.arrowPySparkSelfDestructEnabled()
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow(
                         split_batches=self_destruct
                     )
@@ -176,11 +175,11 @@ class PandasConversionMixin:
                     else:
                         corrected_panda_types = {}
                         for index, field in enumerate(self.schema):
-                            panda_type = PandasConversionMixin._to_corrected_pandas_type(
+                            pandas_type = PandasConversionMixin._to_corrected_pandas_type(
                                 field.dataType
                             )
                             corrected_panda_types[tmp_column_names[index]] = (
-                                np.object0 if panda_type is None else panda_type
+                                np.object0 if pandas_type is None else pandas_type
                             )
 
                         pdf = pd.DataFrame(columns=tmp_column_names).astype(
@@ -206,36 +205,37 @@ class PandasConversionMixin:
         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
         column_counter = Counter(self.columns)
 
-        dtype: List[Optional[Type]] = [None] * len(self.schema)
-        for fieldIdx, field in enumerate(self.schema):
-            # For duplicate column name, we use `iloc` to access it.
+        corrected_dtypes: List[Optional[Type]] = [None] * len(self.schema)
+        for index, field in enumerate(self.schema):
+            # We use `iloc` to access columns with duplicate column names.
             if column_counter[field.name] > 1:
-                pandas_col = pdf.iloc[:, fieldIdx]
+                pandas_col = pdf.iloc[:, index]
             else:
                 pandas_col = pdf[field.name]
 
             pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType)
             # SPARK-21766: if an integer field is nullable and has null values, it can be
-            # inferred by pandas as float column. Once we convert the column with NaN back
-            # to integer type e.g., np.int16, we will hit exception. So we use the inferred
-            # float type, not the corrected type from the schema in this case.
+            # inferred by pandas as a float column. If we convert the column with NaN back
+            # to integer type e.g., np.int16, we will hit an exception. So we use the
+            # pandas-inferred float type, rather than the corrected type from the schema
+            # in this case.
             if pandas_type is not None and not (
                 isinstance(field.dataType, IntegralType)
                 and field.nullable
                 and pandas_col.isnull().any()
             ):
-                dtype[fieldIdx] = pandas_type
-            # Ensure we fall back to nullable numpy types, even when whole column is null:
+                corrected_dtypes[index] = pandas_type
+            # Ensure we fall back to nullable numpy types.
             if isinstance(field.dataType, IntegralType) and pandas_col.isnull().any():
-                dtype[fieldIdx] = np.float64
+                corrected_dtypes[index] = np.float64
             if isinstance(field.dataType, BooleanType) and pandas_col.isnull().any():
-                dtype[fieldIdx] = np.object  # type: ignore[attr-defined]
+                corrected_dtypes[index] = np.object  # type: ignore[attr-defined]
 
         df = pd.DataFrame()
-        for index, t in enumerate(dtype):
+        for index, t in enumerate(corrected_dtypes):
             column_name = self.schema[index].name
 
-            # For duplicate column name, we use `iloc` to access it.
+            # We use `iloc` to access columns with duplicate column names.
             if column_counter[column_name] > 1:
                 series = pdf.iloc[:, index]
             else:
@@ -255,25 +255,23 @@ class PandasConversionMixin:
             else:
                 df[column_name] = series
 
-        pdf = df
-
         if timezone is None:
-            return pdf
+            return df
         else:
             from pyspark.sql.pandas.types import _check_series_convert_timestamps_local_tz
 
             for field in self.schema:
                 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
                 if isinstance(field.dataType, TimestampType):
-                    pdf[field.name] = _check_series_convert_timestamps_local_tz(
-                        pdf[field.name], timezone
+                    df[field.name] = _check_series_convert_timestamps_local_tz(
+                        df[field.name], timezone
                     )
-            return pdf
+            return df
 
     @staticmethod
     def _to_corrected_pandas_type(dt: DataType) -> Optional[Type]:
         """
-        When converting Spark SQL records to Pandas :class:`DataFrame`, the inferred data type
+        When converting Spark SQL records to Pandas `pandas.DataFrame`, the inferred data type
         may be wrong. This method gets the corrected data type for Pandas if that type may be
         inferred incorrectly.
         """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org