You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damccorm (via GitHub)" <gi...@apache.org> on 2023/05/18 20:30:50 UTC
[GitHub] [beam] damccorm opened a new pull request, #26772: Add docs on pre/post processing operations and dlq support
damccorm opened a new pull request, #26772:
URL: https://github.com/apache/beam/pull/26772
This adds docs for the DLQ and pre/postprocess features added in #26309 and #26261
This shouldn't be merged until 2.48 is released
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm merged pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #26772:
URL: https://github.com/apache/beam/pull/26772
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199190129
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,9 @@ with pipeline as p:
For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
-## Use a keyed ModelHandler
+## RunInference Patterns
Review Comment:
Done - let me know if you think it sounds ok: "This section suggests patterns and best practices that can be used to make your inference pipelines simpler, more robust, and more efficient."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm commented on pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1555024259
I'll merge this once 2.48 is released
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rszper commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "rszper (via GitHub)" <gi...@apache.org>.
rszper commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1198306235
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
Review Comment:
I think we could simplify this header. The additional information is in the description.
```suggestion
### Preprocess and postprocess your records
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,9 @@ with pipeline as p:
For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
-## Use a keyed ModelHandler
+## RunInference Patterns
Review Comment:
We should include text after this header describing the section. Something along the lines of:
This section provides examples of RunInference patterns.
(But hopefully more informative than that.)
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
Review Comment:
```suggestion
To apply postprocessing operations, use `with_postprocess_fn` on your model handler:
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
Review Comment:
```suggestion
You can also chain multiple pre- and postprocessing operations:
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
Review Comment:
```suggestion
inference result in the order of first applied to last applied.
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
Review Comment:
```suggestion
`PCollection` in the order of last applied to first applied.
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
Review Comment:
```suggestion
To apply preprocessing operations, use `with_preprocess_fn` on your model handler:
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
Review Comment:
```suggestion
The preprocessing function is run before batching and inference. This function maps your input `PCollection`
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
Review Comment:
```suggestion
to your desired output type. If you apply multiple postprocessing functions, they run on your original
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
Review Comment:
```suggestion
With RunInference, you can add preprocessing and postprocessing operations to your transform.
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
Review Comment:
```suggestion
to the base input type of the model handler. If you apply multiple preprocessing functions, they run on your original
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
Review Comment:
```suggestion
The postprocessing function runs after inference. This function maps the output type of the base model handler
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:
Review Comment:
```suggestion
RunInference has built-in support for dead-letter queues. You can use a dead-letter queue by applying `with_exception_handling` to your RunInference transform:
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:
+
+```
+main, other = pcoll | RunInference(model_handler).with_exception_handling()
+other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
+```
+
+You can also apply this pattern to RunInference transforms with associated pre/postprocessing operations:
Review Comment:
```suggestion
You can also apply this pattern to RunInference transforms with associated pre- and postprocessing operations:
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
Review Comment:
```suggestion
To handle errors robustly while using RunInference, you can use a _dead-letter queue_. The dead-letter queue outputs failed records into a separate `PCollection` for further processing.
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
**Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+ model_handler.with_preprocess_fn(
+ lambda x : do_something(x)
+ ).with_preprocess_fn(
+ lambda x : do_something_else(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_after_inference(x)
+ ).with_postprocess_fn(
+ lambda x : do_something_else_after_inference(x)
+ ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
Review Comment:
```suggestion
This `PCollection` can then be analyzed and sent to a storage system, where it can be reviewed and resubmitted to the pipeline, or discarded.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rszper commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "rszper (via GitHub)" <gi...@apache.org>.
rszper commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199211452
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,12 @@ with pipeline as p:
For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
-## Use a keyed ModelHandler
+## RunInference Patterns
+
+This section suggests patterns and best practices that can be used to make your inference pipelines simpler,
Review Comment:
```suggestion
This section suggests patterns and best practices that you can use to make your inference pipelines simpler,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1553627473
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm commented on pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #26772:
URL: https://github.com/apache/beam/pull/26772#issuecomment-1553626572
R: @rszper
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support
Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1199220852
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,12 @@ with pipeline as p:
For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
-## Use a keyed ModelHandler
+## RunInference Patterns
+
+This section suggests patterns and best practices that can be used to make your inference pipelines simpler,
Review Comment:
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org