You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/04/28 00:20:35 UTC
[spark] branch branch-3.3 updated: [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 84addc5d1d8 [SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`
84addc5d1d8 is described below
commit 84addc5d1d8359a5b716ec869489fc961af23cf2
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Thu Apr 28 09:17:24 2022 +0900
[SPARK-39051][PYTHON] Minor refactoring of `python/pyspark/sql/pandas/conversion.py`
Minor refactoring of `python/pyspark/sql/pandas/conversion.py`, which includes:
- doc change
- renaming
To improve code readability and maintainability.
No.
Existing tests.
Closes #36384 from xinrong-databricks/conversion.py.
Authored-by: Xinrong Meng <xi...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit c19fadabde3ef3f9c7e4fa9bf74632a4f8e1f3e2)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/pandas/conversion.py | 52 ++++++++++++++++-----------------
1 file changed, 25 insertions(+), 27 deletions(-)
diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index 7153450d2bc..808444f1e2e 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -51,7 +51,7 @@ if TYPE_CHECKING:
class PandasConversionMixin:
"""
- Min-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame`
+ Mix-in for the conversion from Spark to pandas. Currently, only :class:`DataFrame`
can use this class.
"""
@@ -65,10 +65,10 @@ class PandasConversionMixin:
Notes
-----
- This method should only be used if the resulting Pandas's :class:`DataFrame` is
+ This method should only be used if the resulting Pandas ``pandas.DataFrame`` is
expected to be small, as all the data is loaded into the driver's memory.
- Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental.
+ Usage with ``spark.sql.execution.arrow.pyspark.enabled=True`` is experimental.
Examples
--------
@@ -136,8 +136,7 @@ class PandasConversionMixin:
# Rename columns to avoid duplicated column names.
tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
- c = self.sparkSession._jconf
- self_destruct = c.arrowPySparkSelfDestructEnabled()
+ self_destruct = jconf.arrowPySparkSelfDestructEnabled()
batches = self.toDF(*tmp_column_names)._collect_as_arrow(
split_batches=self_destruct
)
@@ -176,11 +175,11 @@ class PandasConversionMixin:
else:
corrected_panda_types = {}
for index, field in enumerate(self.schema):
- panda_type = PandasConversionMixin._to_corrected_pandas_type(
+ pandas_type = PandasConversionMixin._to_corrected_pandas_type(
field.dataType
)
corrected_panda_types[tmp_column_names[index]] = (
- np.object0 if panda_type is None else panda_type
+ np.object0 if pandas_type is None else pandas_type
)
pdf = pd.DataFrame(columns=tmp_column_names).astype(
@@ -206,36 +205,37 @@ class PandasConversionMixin:
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
column_counter = Counter(self.columns)
- dtype: List[Optional[Type]] = [None] * len(self.schema)
- for fieldIdx, field in enumerate(self.schema):
- # For duplicate column name, we use `iloc` to access it.
+ corrected_dtypes: List[Optional[Type]] = [None] * len(self.schema)
+ for index, field in enumerate(self.schema):
+ # We use `iloc` to access columns with duplicate column names.
if column_counter[field.name] > 1:
- pandas_col = pdf.iloc[:, fieldIdx]
+ pandas_col = pdf.iloc[:, index]
else:
pandas_col = pdf[field.name]
pandas_type = PandasConversionMixin._to_corrected_pandas_type(field.dataType)
# SPARK-21766: if an integer field is nullable and has null values, it can be
- # inferred by pandas as float column. Once we convert the column with NaN back
- # to integer type e.g., np.int16, we will hit exception. So we use the inferred
- # float type, not the corrected type from the schema in this case.
+ # inferred by pandas as a float column. If we convert the column with NaN back
+ # to integer type e.g., np.int16, we will hit an exception. So we use the
+ # pandas-inferred float type, rather than the corrected type from the schema
+ # in this case.
if pandas_type is not None and not (
isinstance(field.dataType, IntegralType)
and field.nullable
and pandas_col.isnull().any()
):
- dtype[fieldIdx] = pandas_type
- # Ensure we fall back to nullable numpy types, even when whole column is null:
+ corrected_dtypes[index] = pandas_type
+ # Ensure we fall back to nullable numpy types.
if isinstance(field.dataType, IntegralType) and pandas_col.isnull().any():
- dtype[fieldIdx] = np.float64
+ corrected_dtypes[index] = np.float64
if isinstance(field.dataType, BooleanType) and pandas_col.isnull().any():
- dtype[fieldIdx] = np.object # type: ignore[attr-defined]
+ corrected_dtypes[index] = np.object # type: ignore[attr-defined]
df = pd.DataFrame()
- for index, t in enumerate(dtype):
+ for index, t in enumerate(corrected_dtypes):
column_name = self.schema[index].name
- # For duplicate column name, we use `iloc` to access it.
+ # We use `iloc` to access columns with duplicate column names.
if column_counter[column_name] > 1:
series = pdf.iloc[:, index]
else:
@@ -255,25 +255,23 @@ class PandasConversionMixin:
else:
df[column_name] = series
- pdf = df
-
if timezone is None:
- return pdf
+ return df
else:
from pyspark.sql.pandas.types import _check_series_convert_timestamps_local_tz
for field in self.schema:
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
if isinstance(field.dataType, TimestampType):
- pdf[field.name] = _check_series_convert_timestamps_local_tz(
- pdf[field.name], timezone
+ df[field.name] = _check_series_convert_timestamps_local_tz(
+ df[field.name], timezone
)
- return pdf
+ return df
@staticmethod
def _to_corrected_pandas_type(dt: DataType) -> Optional[Type]:
"""
- When converting Spark SQL records to Pandas :class:`DataFrame`, the inferred data type
+ When converting Spark SQL records to Pandas `pandas.DataFrame`, the inferred data type
may be wrong. This method gets the corrected data type for Pandas if that type may be
inferred incorrectly.
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org