You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "rszper (via GitHub)" <gi...@apache.org> on 2023/05/18 21:20:29 UTC

[GitHub] [beam] rszper commented on a diff in pull request #26772: Add docs on pre/post processing operations and dlq support

rszper commented on code in PR #26772:
URL: https://github.com/apache/beam/pull/26772#discussion_r1198306235


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference

Review Comment:
   I think we could simplify this header. The additional information is in the description.
   
   ```suggestion
   ### Preprocess and postprocess your records
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -192,7 +192,9 @@ with pipeline as p:
 
 For more information on resource hints, see [Resource hints](/documentation/runtime/resource-hints/).
 
-## Use a keyed ModelHandler
+## RunInference Patterns

Review Comment:
   We should include text after this header describing the section. Something along the lines of:
   
   This section provides examples of RunInference patterns.
   
   (But hopefully more informative than that.)



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:

Review Comment:
   ```suggestion
   To apply postprocessing operations, use `with_postprocess_fn` on your model handler:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:

Review Comment:
   ```suggestion
   You can also chain multiple pre- and postprocessing operations:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.

Review Comment:
   ```suggestion
   inference result in the order of first applied to last applied.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.

Review Comment:
   ```suggestion
   `PCollection` in the order of last applied to first applied.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:

Review Comment:
   ```suggestion
   To apply preprocessing operations, use `with_preprocess_fn` on your model handler:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection

Review Comment:
   ```suggestion
   The preprocessing function is run before batching and inference. This function maps your input `PCollection`
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original

Review Comment:
   ```suggestion
   to your desired output type. If you apply multiple postprocessing functions, they run on your original
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.

Review Comment:
   ```suggestion
   With RunInference, you can add preprocessing and postprocessing operations to your transform.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original

Review Comment:
   ```suggestion
   to the base input type of the model handler. If you apply multiple preprocessing functions, they run on your original
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output

Review Comment:
   ```suggestion
   The postprocessing function runs after inference. This function maps the output type of the base model handler
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:

Review Comment:
   ```suggestion
   RunInference has built-in support for dead-letter queues. You can use a dead-letter queue by applying `with_exception_handling` to your RunInference transform:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.
+RunInference has native support for Deadletter Queues; you can use one by applying the `with_exception_handling` to your RunInference transform:
+
+```
+main, other = pcoll | RunInference(model_handler).with_exception_handling()
+other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
+```
+
+You can also apply this pattern to RunInference transforms with associated pre/postprocessing operations:

Review Comment:
   ```suggestion
   You can also apply this pattern to RunInference transforms with associated pre- and postprocessing operations:
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.

Review Comment:
   ```suggestion
   To handle errors robustly while using RunInference, you can use a _dead-letter queue_. The dead-letter queue outputs failed records into a separate `PCollection` for further processing.
   ```



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -267,7 +264,65 @@ The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache
 **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is
             an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount` and `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input.
 
-## Beam Java SDK support
+### Preprocess and postprocess your records before and after inference
+
+RunInference supports adding pre and post processing operations as part of your transform.
+To apply preprocessing operations, use `with_preprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
+```
+
+To apply postprocessing operations, use `with_postprocess_fn` on your ModelHandler:
+
+```
+inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
+```
+
+You can also chain multiple pre/postprocess operations:
+
+```
+inference = pcoll | RunInference(
+    model_handler.with_preprocess_fn(
+      lambda x : do_something(x)
+    ).with_preprocess_fn(
+      lambda x : do_something_else(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_after_inference(x)
+    ).with_postprocess_fn(
+      lambda x : do_something_else_after_inference(x)
+    ))
+```
+
+The preprocessing function will be run before batching/inference and should map your input PCollection
+to the base ModelHandler's input type. If you apply multiple preprocessing functions, they will be run on your original
+PCollection in order from last applied to first applied.
+
+The postprocessing function will be run after inference and should map the base ModelHandler's output
+type to your desired output type. If you apply multiple postprocessing functions, they will be run on your original
+inference result in order from first applied to last applied.
+
+### Handle errors while using RunInference
+
+To handle errors robustly while using RunInference, you can use a _Deadletter Queue (DLQ)_ to output failed records into a separate PCollection for further processing.
+This PCollection can then be analyzed and sent to a storage system so that it can be reviewed and resubmitted to the pipeline or discarded.

Review Comment:
   ```suggestion
   This `PCollection` can then be analyzed and sent to a storage system, where it can be reviewed and resubmitted to the pipeline, or discarded.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org