You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/01 11:15:04 UTC

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r960524940


##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +112,170 @@ def array_to_vector(col: Column) -> Column:
     return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:
+    """Given a function which loads a model, returns a pandas_udf for inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for
+    running the model or the Spark executor environment already satisfies all runtime requirements.
+
+    When selecting columns in pyspark SQL, users are required to always use `struct` for simplicity.
+
+    For the conversion of Spark DataFrame to numpy, the following table describes the behavior,
+    where tensor columns in the Spark DataFrame must be represented as a flattened 1-D array/list.
+
+    | dataframe \\ model | single input | multiple inputs |
+    | :----------------- | :----------- | :-------------- |
+    | single-col scalar  | 1            | N/A             |
+    | single-col tensor  | 1,2          | N/A             |
+    | multi-col scalar   | 3            | 4               |
+    | multi-col tensor   | N/A          | 4,2             |
+
+
+    Notes:
+    1. pass thru dataframe column => model input as single numpy array.
+    2. reshape flattened tensors into expected tensor shapes.
+    3. convert entire dataframe into single numpy array via df.to_numpy(), or user can use
+       `pyspark.sql.functions.array()` to transform the input into a single-col tensor first.
+    4. pass thru dataframe column => model input as an (ordered) dictionary of numpy arrays.
+
+    Parameters
+    ----------
+    predict_batch_fn : Callable
+        Function which is responsible for loading a model and returning a `predict` function.
+    return_type : DataType
+        Spark SQL datatype for the expected output.
+        Default: ArrayType(FloatType())
+    batch_size : int
+        Batch size to use for inference, note that this is typically a limitation of the model
+        and/or the hardware resources and is usually smaller than the Spark partition size.
+        Default: -1, which sends the entire Spark partition to the model.
+    input_names: list[str]
+        Optional list of input names which will be used to map DataFrame column names to model
+        input names.  The order of names must match the order of the selected DataFrame columns.
+        If provided, the `predict()` function will be passed a dictionary of named inputs.
+    input_tensor_shapes: list[list[int]]
+        Optional list of input tensor shapes for models with tensor inputs.  Each tensor
+        input must be represented as a single DataFrame column containing a flattened 1-D array.
+        The order of the tensor shapes must match the order of the selected DataFrame columns.
+        Tabular datasets with scalar-valued columns should not supply this argument.
+
+    Returns
+    -------
+    A pandas_udf for predicting a batch.
+    """
+    # generate a new uuid each time this is invoked on the driver to invalidate executor-side cache.
+    model_uuid = uuid.uuid4()
+
+    def predict(data: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
+        import pyspark.ml.executor_globals as exec_global
+
+        if exec_global.predict_fn and exec_global.model_uuid == model_uuid:
+            predict_fn = exec_global.predict_fn
+        else:
+            predict_fn = predict_batch_fn(**kwargs)
+            exec_global.predict_fn = predict_fn
+            exec_global.model_uuid = model_uuid
+
+        for partition in data:
+            has_tensors = has_tensor_cols(partition)
+            for batch in batched(partition, batch_size):
+                inputs: Union[np.ndarray, dict[str, np.ndarray]]
+                if input_names:
+                    # input names provided, expect a dictionary of named numpy arrays
+                    # check if the number of inputs matches expected
+                    num_expected = len(input_names)
+                    num_actual = len(batch.columns)
+                    if num_actual != num_expected:
+                        msg = "Model expected {} inputs, but received {} columns"
+                        raise ValueError(msg.format(num_expected, num_actual))
+
+                    # rename dataframe column names to match model input names, if needed
+                    if input_names != list(batch.columns):
+                        batch.columns = input_names
+
+                    if has_tensors:
+                        raise ValueError("Tensor columns require an input_tensor_shape")
+
+                    # create a dictionary of named inputs
+                    inputs_dict = batch.to_dict(orient="series")
+
+                    # reshape inputs, if needed
+                    if input_tensor_shapes:
+                        if len(input_tensor_shapes) == num_actual:
+                            for i, (k, v) in enumerate(inputs_dict.items()):
+                                inputs_dict[k] = v.reshape(input_tensor_shapes[i])  # type: ignore
+                        else:
+                            raise ValueError("input_tensor_shapes must match columns")
+
+                    inputs = inputs_dict  # type: ignore
+                else:
+                    # no input names provided, expect a single numpy array
+                    if input_tensor_shapes:
+                        if len(input_tensor_shapes) == 1:
+                            if len(batch.columns) == 1:
+                                # if one tensor input and one column, vstack and reshape the batch
+                                input_shape = input_tensor_shapes[0]
+                                input_shape[0] = -1  # replace None with -1 in batch dimension
+                                # input = np.vstack(batch).reshape(input_shape)     # name, col
+                                inputs = np.vstack(batch.iloc[:, 0]).reshape(input_shape)  # struct

Review Comment:
   So the `input_shape` (comes from `input_tensor_shapes` argument)  must be [-1, xx, ..], first dimension should always be `-1`.
   I think we can let user pass shape arguments without the first `-1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org