You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/06/02 12:28:27 UTC

[spark] branch master updated: [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect

This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c3b62708cd6 [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
c3b62708cd6 is described below

commit c3b62708cd6371da08943e572a9bc0a45c1dcced
Author: Weichen Xu <we...@databricks.com>
AuthorDate: Fri Jun 2 20:28:01 2023 +0800

    [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
    
    ### What changes were proposed in this pull request?
    
    Drop vector type support in Distributed ML for spark connect.
    
    ### Why are the changes needed?
    
    Distributed ML is designed for supporting fitting / transforming over either spark dataframe or local pandas dataframe.
    Currently pandas dataframe does not have vector type similar to `spark.ml.linalg.Vector`, and Vector type does not have too much advantages except saving sparse features dataset.
    
    To make the interface consistent, we decided initial version does not support vector type.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT.
    
    Closes #41420 from WeichenXu123/mlv2-drop-vector-type-support.
    
    Authored-by: Weichen Xu <we...@databricks.com>
    Signed-off-by: Weichen Xu <we...@databricks.com>
---
 python/pyspark/mlv2/base.py                  |  6 +++++-
 python/pyspark/mlv2/tests/test_feature.py    | 16 +++++++---------
 python/pyspark/mlv2/tests/test_summarizer.py | 10 ++++------
 python/pyspark/mlv2/util.py                  | 19 -------------------
 4 files changed, 16 insertions(+), 35 deletions(-)

diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py
index dc503db71c0..63631eccf2f 100644
--- a/python/pyspark/mlv2/base.py
+++ b/python/pyspark/mlv2/base.py
@@ -134,7 +134,11 @@ class Transformer(Params, metaclass=ABCMeta):
     def _get_transform_fn(self) -> Callable[["pd.Series"], Any]:
         """
         Return a transformation function that accepts an instance of `pd.Series` as input and
-        returns transformed result as an instance of `pd.Series` or `pd.DataFrame`
+        returns transformed result as an instance of `pd.Series` or `pd.DataFrame`.
+        If there's only one output column, the transformed result must be an
+        instance of `pd.Series`, if there are multiple output columns, the transformed result
+        must be an instance of `pd.DataFrame` with column names matching output schema
+        returned by  `_output_columns` interface.
         """
         raise NotImplementedError()
 
diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py
index 8bc9d4c2307..eed04217a6f 100644
--- a/python/pyspark/mlv2/tests/test_feature.py
+++ b/python/pyspark/mlv2/tests/test_feature.py
@@ -21,8 +21,6 @@ from distutils.version import LooseVersion
 import numpy as np
 import pandas as pd
 
-from pyspark.ml.functions import vector_to_array
-from pyspark.ml.linalg import Vectors
 from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
 from pyspark.sql import SparkSession
 
@@ -35,8 +33,8 @@ class FeatureTestsMixin:
     def test_max_abs_scaler(self):
         df1 = self.spark.createDataFrame(
             [
-                (Vectors.dense([2.0, 3.5, 1.5]),),
-                (Vectors.dense([-3.0, -0.5, -2.5]),),
+                ([2.0, 3.5, 1.5],),
+                ([-3.0, -0.5, -2.5],),
             ],
             schema=["features"],
         )
@@ -49,7 +47,7 @@ class FeatureTestsMixin:
 
         np.testing.assert_allclose(list(result.scaled_features), expected_result)
 
-        local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
+        local_df1 = df1.toPandas()
         local_fit_model = scaler.fit(local_df1)
         local_transform_result = local_fit_model.transform(local_df1)
 
@@ -62,9 +60,9 @@ class FeatureTestsMixin:
     def test_standard_scaler(self):
         df1 = self.spark.createDataFrame(
             [
-                (Vectors.dense([2.0, 3.5, 1.5]),),
-                (Vectors.dense([-3.0, -0.5, -2.5]),),
-                (Vectors.dense([1.0, -1.5, 0.5]),),
+                ([2.0, 3.5, 1.5],),
+                ([-3.0, -0.5, -2.5],),
+                ([1.0, -1.5, 0.5],),
             ],
             schema=["features"],
         )
@@ -81,7 +79,7 @@ class FeatureTestsMixin:
 
         np.testing.assert_allclose(list(result.scaled_features), expected_result)
 
-        local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
+        local_df1 = df1.toPandas()
         local_fit_model = scaler.fit(local_df1)
         local_transform_result = local_fit_model.transform(local_df1)
 
diff --git a/python/pyspark/mlv2/tests/test_summarizer.py b/python/pyspark/mlv2/tests/test_summarizer.py
index e78510b8ff4..927ef0bdd5e 100644
--- a/python/pyspark/mlv2/tests/test_summarizer.py
+++ b/python/pyspark/mlv2/tests/test_summarizer.py
@@ -21,8 +21,6 @@ from distutils.version import LooseVersion
 import numpy as np
 import pandas as pd
 
-from pyspark.ml.linalg import Vectors
-from pyspark.ml.functions import vector_to_array
 from pyspark.mlv2.summarizer import summarize_dataframe
 from pyspark.sql import SparkSession
 
@@ -35,14 +33,14 @@ class SummarizerTestsMixin:
     def test_summarize_dataframe(self):
         df1 = self.spark.createDataFrame(
             [
-                (Vectors.dense([2.0, -1.5]),),
-                (Vectors.dense([-3.0, 0.5]),),
-                (Vectors.dense([1.0, 3.5]),),
+                ([2.0, -1.5],),
+                ([-3.0, 0.5],),
+                ([1.0, 3.5],),
             ],
             schema=["features"],
         )
 
-        df1_local = df1.withColumn("features", vector_to_array("features")).toPandas()
+        df1_local = df1.toPandas()
 
         result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"])
         result_local = summarize_dataframe(
diff --git a/python/pyspark/mlv2/util.py b/python/pyspark/mlv2/util.py
index 9aebb3fa9a3..a5c2dd6e3b6 100644
--- a/python/pyspark/mlv2/util.py
+++ b/python/pyspark/mlv2/util.py
@@ -66,17 +66,6 @@ def aggregate_dataframe(
         agg_state = local_agg_fn(dataframe)
         return agg_state_to_result(agg_state)
 
-    col_types = dict(dataframe.dtypes)
-
-    for col_name in input_col_names:
-        col_type = col_types[col_name]
-        if col_type == "vector":
-            from pyspark.ml.functions import vector_to_array
-
-            # pandas UDF does not support vector type for now,
-            # we convert it into vector type
-            dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name)))
-
     dataframe = dataframe.select(*input_col_names)
 
     def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]:
@@ -172,14 +161,6 @@ def transform_dataframe_column(
         return transform_fn(s)
 
     input_col = col(input_col_name)
-    input_col_type = dict(dataframe.dtypes)[input_col_name]
-
-    if input_col_type == "vector":
-        from pyspark.ml.functions import vector_to_array
-
-        # pandas UDF does not support vector type for now,
-        # we convert it into vector type
-        input_col = vector_to_array(input_col)
 
     result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org