You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/21 09:25:50 UTC
[spark] branch master updated: [SPARK-40161][PS] Make Series.mode apply PandasMode
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e69b2df7a3b [SPARK-40161][PS] Make Series.mode apply PandasMode
e69b2df7a3b is described below
commit e69b2df7a3bb0cc77c315830180bb0c3e76957d7
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sun Aug 21 18:25:34 2022 +0900
[SPARK-40161][PS] Make Series.mode apply PandasMode
### What changes were proposed in this pull request?
1, move `PandasMode` into `pyspark.pandas.spark.functions`
2, apply `PandasMode` internally, so that only one pass on the dataset is needed
### Why are the changes needed?
to simplify existing implementation
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing doctests
Closes #37596 from zhengruifeng/ps_update_series_mode.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/frame.py | 5 +----
python/pyspark/pandas/series.py | 12 ++++++------
python/pyspark/pandas/spark/functions.py | 5 +++++
3 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index df2b5fffa62..72913bc17d3 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -65,7 +65,6 @@ if TYPE_CHECKING:
from pandas.core.dtypes.common import infer_dtype_from_object
from pandas.core.accessor import CachedAccessor
from pandas.core.dtypes.inference import is_sequence
-from pyspark import SparkContext
from pyspark import StorageLevel
from pyspark.sql import Column, DataFrame as SparkDataFrame, functions as F
from pyspark.sql.functions import pandas_udf
@@ -12442,8 +12441,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
if numeric_only is None and axis == 0:
numeric_only = True
- sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
-
mode_scols: List[Column] = []
mode_col_names: List[str] = []
mode_labels: List[Label] = []
@@ -12455,7 +12452,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
if not numeric_only or is_numeric:
scol = psser.spark.column
- mode_scol = Column(sql_utils.pandasMode(scol._jc, dropna)).alias(col_name)
+ mode_scol = SF.mode(scol, dropna).alias(col_name)
mode_scols.append(mode_scol)
mode_col_names.append(col_name)
mode_labels.append(label)
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index e1b4ac3a3e3..fa99ddf76ce 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -4731,12 +4731,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
3 NaN
dtype: float64
"""
- ser_count = self.value_counts(dropna=dropna, sort=False)
- sdf_count = ser_count._internal.spark_frame
- most_value = ser_count.max()
- sdf_most_value = sdf_count.filter("count == {}".format(str(most_value)))
- sdf = sdf_most_value.select(
- F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME)
+ scol = self.spark.column
+ name = self._internal.data_spark_column_names[0]
+ sdf = (
+ self._internal.spark_frame.select(SF.mode(scol, dropna).alias(name))
+ .select(F.array_sort(F.col(name)).alias(name))
+ .select(F.explode(F.col(name)).alias(name))
)
internal = InternalFrame(spark_frame=sdf, index_spark_columns=None, column_labels=[None])
ser_mode = first_series(DataFrame(internal))
diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py
index 11f9dbbb8c0..58715b5f781 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -46,6 +46,11 @@ def kurt(col: Column) -> Column:
return Column(sc._jvm.PythonSQLUtils.pandasKurtosis(col._jc))
+def mode(col: Column, dropna: bool) -> Column:
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna))
+
+
def repeat(col: Column, n: Union[int, Column]) -> Column:
"""
Repeats a string column n times, and returns it as a new string column.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org