You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WeichenXu123 (via GitHub)" <gi...@apache.org> on 2023/10/03 09:53:08 UTC

[PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

WeichenXu123 opened a new pull request, #43199:
URL: https://github.com/apache/spark/pull/43199

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348201944


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,

Review Comment:
   See https://github.com/apache/spark/pull/43199#issuecomment-1749926141



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348200916


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.

Review Comment:
   this assembler output array type :) we don't need to keep exactly the same behavior with old one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348201048


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.

Review Comment:
   What about rename the class to ArrayAssembler ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348225386


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):
+            raise ValueError(
+                "'feature_size_list' param must be set with an array of integer, and"
+                "its length must be equal to number of input columns."
+            )
+        assembled_feature_size = sum(feature_size_list)
+        handler_invalid = self.getHandleInvalid()
+
+        if handler_invalid not in ["error", "keep"]:
+            raise ValueError("'handler_invalid' param must be set with 'error' or 'keep' value.")
+
+        keep_invalid = handler_invalid == "keep"
+
+        def assemble_features(*feature_list: Any):
+            assembled_array = np.empty(assembled_feature_size, dtype=np.float64)
+            pos = 0
+            for index, feature in enumerate(feature_list):
+                feature_size = feature_size_list[index]
+
+                if keep_invalid:
+                    if feature is None:
+                        assembled_array[pos : pos + feature_size] = np.nan
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature
+                else:
+                    if feature is None or np.isnan(feature).any():
+                        raise ValueError(
+                            f"The input features contains invalid value: {str(feature)}"
+                        )
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature
+
+                pos += feature_size
+
+            return assembled_array
+
+        def transform_fn(*series_list: Any) -> Any:
+            return pd.Series(assemble_features(*feature_list) for feature_list in zip(*series_list))
+
+        return transform_fn
+
+
+# Override doc of VectorAssembler.handleInvalid param.
+VectorAssembler.handleInvalid.doc = (
+    "how to handle invalid entries. Options are 'error' (throw an error), "
+    "or 'keep' (return relevant number of NaN in the output). Default value "
+    "is 'error'"
+)

Review Comment:
   why need this?



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)

Review Comment:
   this doc example is not complete?



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,

Review Comment:
   the name seems a bit long, what about renaming it `featureSizes`? but not feel strong about it



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):
+            raise ValueError(
+                "'feature_size_list' param must be set with an array of integer, and"
+                "its length must be equal to number of input columns."
+            )
+        assembled_feature_size = sum(feature_size_list)
+        handler_invalid = self.getHandleInvalid()
+
+        if handler_invalid not in ["error", "keep"]:
+            raise ValueError("'handler_invalid' param must be set with 'error' or 'keep' value.")
+
+        keep_invalid = handler_invalid == "keep"
+
+        def assemble_features(*feature_list: Any):
+            assembled_array = np.empty(assembled_feature_size, dtype=np.float64)
+            pos = 0
+            for index, feature in enumerate(feature_list):
+                feature_size = feature_size_list[index]
+
+                if keep_invalid:
+                    if feature is None:
+                        assembled_array[pos : pos + feature_size] = np.nan
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature
+                else:
+                    if feature is None or np.isnan(feature).any():

Review Comment:
   Not related to this PR, it seems existing impl of `VectorAssembler` doesn't validate NaN values in Vectors.
   Let me double check and fix it



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",

Review Comment:
   `inputFeatureSizeList` and `handleInvalid` not optional?



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(

Review Comment:
   add it to `pyspark.ml.connect.rst`



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):

Review Comment:
   since `ndarray` and `List` supports negative indices, e.g. `assembled_array[-30: -20]`
   
   I think we may need to check all feature size > 0



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):
+            raise ValueError(
+                "'feature_size_list' param must be set with an array of integer, and"
+                "its length must be equal to number of input columns."
+            )
+        assembled_feature_size = sum(feature_size_list)
+        handler_invalid = self.getHandleInvalid()
+
+        if handler_invalid not in ["error", "keep"]:
+            raise ValueError("'handler_invalid' param must be set with 'error' or 'keep' value.")
+
+        keep_invalid = handler_invalid == "keep"
+
+        def assemble_features(*feature_list: Any):
+            assembled_array = np.empty(assembled_feature_size, dtype=np.float64)
+            pos = 0
+            for index, feature in enumerate(feature_list):
+                feature_size = feature_size_list[index]
+
+                if keep_invalid:
+                    if feature is None:
+                        assembled_array[pos : pos + feature_size] = np.nan
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature

Review Comment:
   do we need to check the array size here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #43199:
URL: https://github.com/apache/spark/pull/43199#issuecomment-1757472618

   please also update the PR title and description :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add array assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 closed pull request #43199: [SPARK-45397][ML][CONNECT] Add array assembler feature transformer
URL: https://github.com/apache/spark/pull/43199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #43199:
URL: https://github.com/apache/spark/pull/43199#issuecomment-1749926141

   > It seems it is not consistent with existing impl: 1, existing impl accepts double type and vector type, this pr accepts double type and array type; 2, existing impl outputs vector type, this pr outputs array type; 3, existing impl doesn't have parameter `inputFeatureSizeList`
   
   Yes these differences are by design:
   
   > existing impl accepts double type and vector type; existing impl outputs vector type, this pr outputs array type; 
   
   All other spark connect ML estimators / transformers only support array type feature input for now, the reason is spark connect ML need to support either spark dataframe or pandas dataframe, we haven't think out how to support "ml.Vector" type  in pandas DataFrame. We could design and support it in future
   
   > existing impl doesn't have parameter `inputFeatureSizeList`
   
   Yes, I add `inputFeatureSizeList` for simplify implementation, legacy VectorAssembler uses "column metadata" or check dataset first row to get feature size of input columns, we can avoid extra reading first row request by setting `inputFeatureSizeList`. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348144761


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.

Review Comment:
   ```suggestion
       A feature transformer that merges multiple input columns into a vector type column.
   ```



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.
+    You need to set param `inputCols` for specifying input column names,

Review Comment:
   What about having a `Parameters` section for those parameters?



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,124 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class VectorAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into a array type column.
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,

Review Comment:
   existing impl doesn't have this param



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1352780209


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):
+            raise ValueError(
+                "'feature_size_list' param must be set with an array of integer, and"
+                "its length must be equal to number of input columns."
+            )
+        assembled_feature_size = sum(feature_size_list)
+        handler_invalid = self.getHandleInvalid()
+
+        if handler_invalid not in ["error", "keep"]:
+            raise ValueError("'handler_invalid' param must be set with 'error' or 'keep' value.")
+
+        keep_invalid = handler_invalid == "keep"
+
+        def assemble_features(*feature_list: Any):
+            assembled_array = np.empty(assembled_feature_size, dtype=np.float64)
+            pos = 0
+            for index, feature in enumerate(feature_list):
+                feature_size = feature_size_list[index]
+
+                if keep_invalid:
+                    if feature is None:
+                        assembled_array[pos : pos + feature_size] = np.nan
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature
+                else:
+                    if feature is None or np.isnan(feature).any():
+                        raise ValueError(
+                            f"The input features contains invalid value: {str(feature)}"
+                        )
+                    else:
+                        assembled_array[pos : pos + feature_size] = feature
+
+                pos += feature_size
+
+            return assembled_array
+
+        def transform_fn(*series_list: Any) -> Any:
+            return pd.Series(assemble_features(*feature_list) for feature_list in zip(*series_list))
+
+        return transform_fn
+
+
+# Override doc of VectorAssembler.handleInvalid param.
+VectorAssembler.handleInvalid.doc = (
+    "how to handle invalid entries. Options are 'error' (throw an error), "
+    "or 'keep' (return relevant number of NaN in the output). Default value "
+    "is 'error'"
+)

Review Comment:
   To override doc of invalidHandler param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1352779411


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)

Review Comment:
   We don't enable doctest, so this example code should be fine. 



##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)

Review Comment:
   We don't enable doctest for this, so this example code should be fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348296838


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",
+    ) -> None:
+        """
+        __init__(self, \\*, inputCols=None, outputCol=None, inputFeatureSizeList=None, handleInvalid="error")
+        """
+        super().__init__()
+        kwargs = self._input_kwargs
+        self._set(**kwargs)
+        self._setDefault(handleInvalid="error")
+
+    def _input_columns(self) -> List[str]:
+        return self.getInputCols()
+
+    def _output_columns(self) -> List[Tuple[str, str]]:
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self) -> Callable[..., Any]:
+        feature_size_list = self.getInputFeatureSizeList()
+        if feature_size_list is None or len(feature_size_list) != len(self.getInputCols()):

Review Comment:
   Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45397][ML][CONNECT] Add vector assembler feature transformer [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348303794


##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
         self.scale_values = sk_model.scale_
         self.mean_values = sk_model.mean_
         self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+    Transformer,
+    HasInputCols,
+    HasOutputCol,
+    HasInputFeatureSizeList,
+    HasHandleInvalid,
+    ParamsReadWrite,
+):
+    """
+    A feature transformer that merges multiple input columns into an array type column.
+
+    Parameters
+    ----------
+    You need to set param `inputCols` for specifying input column names,
+    and set param `inputFeatureSizeList` for specifying corresponding input column
+    feature size, for scalar type input column, corresponding feature size must be set to 1,
+    otherwise, set corresponding feature size to feature array length.
+    Output column is "array<double"> type and contains array of assembled features.
+    All elements in input feature columns must be convertible to double type.
+
+    You can set 'handler_invalid' param to specify how to handle invalid input value
+    (None or NaN), if it is set to 'error', error is thrown for invalid input value,
+    if it is set to 'keep', it returns relevant number of NaN in the output.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> from pyspark.ml.connect.feature import VectorAssembler
+    >>> import numpy as np
+    >>>
+    >>> spark_df = spark.createDataFrame(
+    ...     [
+    ...         ([2.0, 3.5, 1.5], 3.0, True, 1),
+    ...         ([-3.0, np.nan, -2.5], 4.0, False, 2),
+    ...     ],
+    ...     schema=["f1", "f2", "f3", "f4"],
+    ... )
+    >>> assembler = VectorAssembler(
+    ...     inputCols=["f1", "f2", "f3", "f4"],
+    ...     outputCol="out",
+    ...     inputFeatureSizeList=[3, 1, 1, 1],
+    ...     handleInvalid="keep",
+    ... )
+    >>> assembler.transform(spark_df).select("out").show(truncate=False)
+    """
+
+    _input_kwargs: Dict[str, Any]
+
+    @keyword_only
+    def __init__(
+        self,
+        *,
+        inputCols: Optional[List[str]] = None,
+        outputCol: Optional[str] = None,
+        inputFeatureSizeList: Optional[List[int]] = None,
+        handleInvalid: Optional[str] = "error",

Review Comment:
   But  we should allow the constructor called with zero arguments, like other estimator/transformers. User can set param later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org