You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/06/02 12:28:27 UTC
[spark] branch master updated: [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
This is an automated email from the ASF dual-hosted git repository.
weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c3b62708cd6 [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
c3b62708cd6 is described below
commit c3b62708cd6371da08943e572a9bc0a45c1dcced
Author: Weichen Xu <we...@databricks.com>
AuthorDate: Fri Jun 2 20:28:01 2023 +0800
[SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
### What changes were proposed in this pull request?
Drop vector type support in Distributed ML for spark connect.
### Why are the changes needed?
Distributed ML is designed for supporting fitting / transforming over either spark dataframe or local pandas dataframe.
Currently pandas dataframe does not have vector type similar to `spark.ml.linalg.Vector`, and Vector type does not have too much advantages except saving sparse features dataset.
To make the interface consistent, we decided initial version does not support vector type.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes #41420 from WeichenXu123/mlv2-drop-vector-type-support.
Authored-by: Weichen Xu <we...@databricks.com>
Signed-off-by: Weichen Xu <we...@databricks.com>
---
python/pyspark/mlv2/base.py | 6 +++++-
python/pyspark/mlv2/tests/test_feature.py | 16 +++++++---------
python/pyspark/mlv2/tests/test_summarizer.py | 10 ++++------
python/pyspark/mlv2/util.py | 19 -------------------
4 files changed, 16 insertions(+), 35 deletions(-)
diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py
index dc503db71c0..63631eccf2f 100644
--- a/python/pyspark/mlv2/base.py
+++ b/python/pyspark/mlv2/base.py
@@ -134,7 +134,11 @@ class Transformer(Params, metaclass=ABCMeta):
def _get_transform_fn(self) -> Callable[["pd.Series"], Any]:
"""
Return a transformation function that accepts an instance of `pd.Series` as input and
- returns transformed result as an instance of `pd.Series` or `pd.DataFrame`
+ returns transformed result as an instance of `pd.Series` or `pd.DataFrame`.
+ If there's only one output column, the transformed result must be an
+ instance of `pd.Series`, if there are multiple output columns, the transformed result
+ must be an instance of `pd.DataFrame` with column names matching output schema
+ returned by `_output_columns` interface.
"""
raise NotImplementedError()
diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py
index 8bc9d4c2307..eed04217a6f 100644
--- a/python/pyspark/mlv2/tests/test_feature.py
+++ b/python/pyspark/mlv2/tests/test_feature.py
@@ -21,8 +21,6 @@ from distutils.version import LooseVersion
import numpy as np
import pandas as pd
-from pyspark.ml.functions import vector_to_array
-from pyspark.ml.linalg import Vectors
from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
from pyspark.sql import SparkSession
@@ -35,8 +33,8 @@ class FeatureTestsMixin:
def test_max_abs_scaler(self):
df1 = self.spark.createDataFrame(
[
- (Vectors.dense([2.0, 3.5, 1.5]),),
- (Vectors.dense([-3.0, -0.5, -2.5]),),
+ ([2.0, 3.5, 1.5],),
+ ([-3.0, -0.5, -2.5],),
],
schema=["features"],
)
@@ -49,7 +47,7 @@ class FeatureTestsMixin:
np.testing.assert_allclose(list(result.scaled_features), expected_result)
- local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
+ local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)
@@ -62,9 +60,9 @@ class FeatureTestsMixin:
def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
- (Vectors.dense([2.0, 3.5, 1.5]),),
- (Vectors.dense([-3.0, -0.5, -2.5]),),
- (Vectors.dense([1.0, -1.5, 0.5]),),
+ ([2.0, 3.5, 1.5],),
+ ([-3.0, -0.5, -2.5],),
+ ([1.0, -1.5, 0.5],),
],
schema=["features"],
)
@@ -81,7 +79,7 @@ class FeatureTestsMixin:
np.testing.assert_allclose(list(result.scaled_features), expected_result)
- local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
+ local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)
diff --git a/python/pyspark/mlv2/tests/test_summarizer.py b/python/pyspark/mlv2/tests/test_summarizer.py
index e78510b8ff4..927ef0bdd5e 100644
--- a/python/pyspark/mlv2/tests/test_summarizer.py
+++ b/python/pyspark/mlv2/tests/test_summarizer.py
@@ -21,8 +21,6 @@ from distutils.version import LooseVersion
import numpy as np
import pandas as pd
-from pyspark.ml.linalg import Vectors
-from pyspark.ml.functions import vector_to_array
from pyspark.mlv2.summarizer import summarize_dataframe
from pyspark.sql import SparkSession
@@ -35,14 +33,14 @@ class SummarizerTestsMixin:
def test_summarize_dataframe(self):
df1 = self.spark.createDataFrame(
[
- (Vectors.dense([2.0, -1.5]),),
- (Vectors.dense([-3.0, 0.5]),),
- (Vectors.dense([1.0, 3.5]),),
+ ([2.0, -1.5],),
+ ([-3.0, 0.5],),
+ ([1.0, 3.5],),
],
schema=["features"],
)
- df1_local = df1.withColumn("features", vector_to_array("features")).toPandas()
+ df1_local = df1.toPandas()
result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"])
result_local = summarize_dataframe(
diff --git a/python/pyspark/mlv2/util.py b/python/pyspark/mlv2/util.py
index 9aebb3fa9a3..a5c2dd6e3b6 100644
--- a/python/pyspark/mlv2/util.py
+++ b/python/pyspark/mlv2/util.py
@@ -66,17 +66,6 @@ def aggregate_dataframe(
agg_state = local_agg_fn(dataframe)
return agg_state_to_result(agg_state)
- col_types = dict(dataframe.dtypes)
-
- for col_name in input_col_names:
- col_type = col_types[col_name]
- if col_type == "vector":
- from pyspark.ml.functions import vector_to_array
-
- # pandas UDF does not support vector type for now,
- # we convert it into vector type
- dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name)))
-
dataframe = dataframe.select(*input_col_names)
def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]:
@@ -172,14 +161,6 @@ def transform_dataframe_column(
return transform_fn(s)
input_col = col(input_col_name)
- input_col_type = dict(dataframe.dtypes)[input_col_name]
-
- if input_col_type == "vector":
- from pyspark.ml.functions import vector_to_array
-
- # pandas UDF does not support vector type for now,
- # we convert it into vector type
- input_col = vector_to_array(input_col)
result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org