You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damccorm (via GitHub)" <gi...@apache.org> on 2023/05/18 20:30:50 UTC

[GitHub] [beam] damccorm opened a new pull request, #26772: Add docs on pre/post processing operations and dlq support

damccorm opened a new pull request, #26772:
URL: https://github.com/apache/beam/pull/26772

   This adds docs for the DLQ and pre/postprocess features added in #26309 and #26261
   
   This shouldn't be merged until 2.48 is released
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm merged pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #26772:
URL: https://github.com/apache/beam/pull/26772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199190129


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,9 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns

Review Comment:
   Done - let me know if you think it sounds ok: "This section suggests patterns and best practices that can be used to make your inference pipelines simpler, more robust, and more efficient."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1555024259

   I'll merge this once 2.48 is released


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rszper commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "rszper (via GitHub)" <gi...@apache.org>.
rszper commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1198306235


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference

Review Comment:
   I think we could simplify this header. The additional information is in the description.
   
   ```suggestion
   ### Preprocess and postprocess your records
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,9 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns

Review Comment:
   We should include text after this header describing the section. Something along the lines of:
   
   This section provides examples of RunInference patterns.
   
   (But hopefully more informative than that.)



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:

Review Comment:
   ```suggestion
   To apply postprocessing operations, use `with_postprocess_fn` on your model handler:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:

Review Comment:
   ```suggestion
   You can also chain multiple pre- and postprocessing operations:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.

Review Comment:
   ```suggestion
   inference result in the order of first applied to last applied.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.

Review Comment:
   ```suggestion
   `PCollection` in the order of last applied to first applied.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:

Review Comment:
   ```suggestion
   To apply preprocessing operations, use `with_preprocess_fn` on your model handler:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection

Review Comment:
   ```suggestion
   The preprocessing function is run before batching and inference. This function maps your input `PCollection`
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original

Review Comment:
   ```suggestion
   to your desired output type. If you apply multiple postprocessing functions, they run on your original
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.

Review Comment:
   ```suggestion
   With RunInference, you can add preprocessing and postprocessing operations to your transform.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original

Review Comment:
   ```suggestion
   to the base input type of the model handler. If you apply multiple preprocessing functions, they run on your original
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output

Review Comment:
   ```suggestion
   The postprocessing function runs after inference. This function maps the output type of the base model handler
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:

Review Comment:
   ```suggestion
   RunInference has built-in support for dead-letter queues. You can use a dead-letter queue by applying `with_exception_handling` to your RunInference transform:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:
+
+```
+main, other = pcoll | RunInference(model_handler).with_exception_handling()
+other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
+```
+
+You can also apply this pattern to RunInference transforms with associated pre/postprocessing operations:

Review Comment:
   ```suggestion
   You can also apply this pattern to RunInference transforms with associated pre- and postprocessing operations:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.

Review Comment:
   ```suggestion
   To handle errors robustly while using RunInference, you can use a _dead-letter queue_. The dead-letter queue outputs failed records into a separate `PCollection` for further processing.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.

Review Comment:
   ```suggestion
   This `PCollection` can then be analyzed and sent to a storage system, where it can be reviewed and resubmitted to the pipeline, or discarded.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rszper commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "rszper (via GitHub)" <gi...@apache.org>.
rszper commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199211452


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,12 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns
+
+This section suggests patterns and best practices that can be used to make your inference pipelines simpler,

Review Comment:
   ```suggestion
   This section suggests patterns and best practices that you can use to make your inference pipelines simpler,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1553627473

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1553626572

   R: @rszper 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199220852


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,12 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns
+
+This section suggests patterns and best practices that can be used to make your inference pipelines simpler,

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org