You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/05/18 20:29:39 UTC

[beam] branch users/damccorm/pre-post-dlq-docs created (now 9826805b9c6)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a change to branch users/damccorm/pre-post-dlq-docs
in repository https://gitbox.apache.org/repos/asf/beam.git


      at 9826805b9c6 Add docs on pre/post processing operations and dlq support

This branch includes the following new commits:

     new 9826805b9c6 Add docs on pre/post processing operations and dlq support

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Add docs on pre/post processing operations and dlq support

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/pre-post-dlq-docs
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9826805b9c6b761d723ee677fd3fcaae56056b5f
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu May 18 16:29:31 2023 -0400

    Add docs on pre/post processing operations and dlq support
---
 .../documentation/sdks/python-machine-learning.md  | 73 +++++++++++++++++++---
 1 file changed, 64 insertions(+), 9 deletions(-)

diff --git a/website/www/site/content/en/documentation/sdks/python-machine-learning.md b/website/www/site/content/en/documentation/sdks/python-machine-learning.md
index ccd64c3412f..6140b1d10be 100644
--- a/website/www/site/content/en/documentation/sdks/python-machine-learning.md
+++ b/website/www/site/content/en/documentation/sdks/python-machine-learning.md
@@ -192,7 +192,9 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns
+
+### Use a keyed ModelHandler
 
 If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object:
 
@@ -212,7 +214,7 @@ If you are unsure if your data is keyed, you can also use `MaybeKeyedModelHandle
 
 For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler).
 
-## Use the `PredictionResult` object
+### Use the `PredictionResult` object
 
 When doing a prediction in Apache Beam, the output `PCollection` includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions.
 
@@ -245,12 +247,7 @@ from apache_beam.ml.inference.base import PredictionResult
 
 For more information, see the [`PredictionResult` documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L65).
 
-## Run a machine learning pipeline
-
-For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the
-[Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub.
-
-## Automatic model refresh
+### Automatic model refresh
 To automatically update the models used with the RunInference `PTransform` without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input `PCollection` to the RunInference input parameter `model_metadata_pcoll`.
 
 `ModelMetdata` is a `NamedTuple` containing:
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:
+
+```
+main, other = pcoll | RunInference(model_handler).with_exception_handling()
+other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
+```
+
+You can also apply this pattern to RunInference transforms with associated pre/postprocessing operations:
+
+```
+main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
+other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
+other.failed_inferences | beam.Map(print) # handles failed inferences
+other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied
+```
+
+### Run inference from a Java pipeline
 
 The RunInference API is available with the Beam Java SDK versions 2.41.0 and later through Apache Beam's [Multi-language Pipelines framework](/documentation/programming-guide/#multi-language-pipelines). For information about the Java wrapper transform, see [RunInference.java](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java). To try it out, see the [Java Sklearn Mnist Classification exa [...]