You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/21 09:25:50 UTC

[spark] branch master updated: [SPARK-40161][PS] Make Series.mode apply PandasMode

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e69b2df7a3b [SPARK-40161][PS] Make Series.mode apply PandasMode
e69b2df7a3b is described below

commit e69b2df7a3bb0cc77c315830180bb0c3e76957d7
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sun Aug 21 18:25:34 2022 +0900

    [SPARK-40161][PS] Make Series.mode apply PandasMode
    
    ### What changes were proposed in this pull request?
    1, move `PandasMode` into `pyspark.pandas.spark.functions`
    2, apply `PandasMode` internally, so that only one pass on the dataset is needed
    
    ### Why are the changes needed?
    to simplify existing implementation
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing doctests
    
    Closes #37596 from zhengruifeng/ps_update_series_mode.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/frame.py           |  5 +----
 python/pyspark/pandas/series.py          | 12 ++++++------
 python/pyspark/pandas/spark/functions.py |  5 +++++
 3 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index df2b5fffa62..72913bc17d3 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -65,7 +65,6 @@ if TYPE_CHECKING:
 from pandas.core.dtypes.common import infer_dtype_from_object
 from pandas.core.accessor import CachedAccessor
 from pandas.core.dtypes.inference import is_sequence
-from pyspark import SparkContext
 from pyspark import StorageLevel
 from pyspark.sql import Column, DataFrame as SparkDataFrame, functions as F
 from pyspark.sql.functions import pandas_udf
@@ -12442,8 +12441,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         if numeric_only is None and axis == 0:
             numeric_only = True
 
-        sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
-
         mode_scols: List[Column] = []
         mode_col_names: List[str] = []
         mode_labels: List[Label] = []
@@ -12455,7 +12452,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
 
             if not numeric_only or is_numeric:
                 scol = psser.spark.column
-                mode_scol = Column(sql_utils.pandasMode(scol._jc, dropna)).alias(col_name)
+                mode_scol = SF.mode(scol, dropna).alias(col_name)
                 mode_scols.append(mode_scol)
                 mode_col_names.append(col_name)
                 mode_labels.append(label)
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index e1b4ac3a3e3..fa99ddf76ce 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -4731,12 +4731,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         3    NaN
         dtype: float64
         """
-        ser_count = self.value_counts(dropna=dropna, sort=False)
-        sdf_count = ser_count._internal.spark_frame
-        most_value = ser_count.max()
-        sdf_most_value = sdf_count.filter("count == {}".format(str(most_value)))
-        sdf = sdf_most_value.select(
-            F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME)
+        scol = self.spark.column
+        name = self._internal.data_spark_column_names[0]
+        sdf = (
+            self._internal.spark_frame.select(SF.mode(scol, dropna).alias(name))
+            .select(F.array_sort(F.col(name)).alias(name))
+            .select(F.explode(F.col(name)).alias(name))
         )
         internal = InternalFrame(spark_frame=sdf, index_spark_columns=None, column_labels=[None])
         ser_mode = first_series(DataFrame(internal))
diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py
index 11f9dbbb8c0..58715b5f781 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -46,6 +46,11 @@ def kurt(col: Column) -> Column:
     return Column(sc._jvm.PythonSQLUtils.pandasKurtosis(col._jc))
 
 
+def mode(col: Column, dropna: bool) -> Column:
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna))
+
+
 def repeat(col: Column, n: Union[int, Column]) -> Column:
     """
     Repeats a string column n times, and returns it as a new string column.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org