You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/06 01:39:51 UTC

[GitHub] [beam] ziqi-ma opened a new pull request, #24911: Ziqima/onnx

ziqi-ma opened a new pull request, #24911:
URL: https://github.com/apache/beam/pull/24911

   **Please** add a meaningful description for your change here
   Addresses #22972 
   
   [IN PROGRESS] 
   This is the first portion of PR with API implementation and unit tests that do not require access to remote data/model files. Want to get this reviewed before proceeding with tests with remote dependencies.
   
   DONE:
   - Added onnx_inference.py which is the onnx API
   - Added onnx_inference_test.py which contains unit tests for onnx models based on PyTorch, tensor flow, and sklearn, tested with docker environment specified by start-build-env-onnx.sh on a linux x86 vm.
   
   TODO:
   - Add unit tests that access remote data
   - Add integration tests
   - Figure out what is the correct way to test with multiple environments (whether tox is needed in this case). The onnx module itself does not require anything more than onnxruntime, but to run the complete unit tests we also need to create models and save it in onnx format, which might require tf2onnx, skl2onnx, as well as PyTorch, tensor flow, sklearn.
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1416778924

   you can ignore codecov and the other error is not relevant to this PR and in general...could be flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1407919491

   > > > > > Hi, once the PR is ready for another review, comment on this with `PTAL @<username>`.
   > > > > > Thanks for the starting this.
   > > > > 
   > > > > 
   > > > > PTAL @AnandInguva [I'm still working on the gradle onnxtest (if that is the right direction to go), but in the meantime could you let me know how to put data at gcp location for integration test? I think maybe that should come before the gradle part.]
   > > > 
   > > > 
   > > > Hi, `gs://apache-beam-samples/run_inference` of project `apache-beam-testing` should be open to public. You can upload your required files/folder here for testing.
   > > 
   > > 
   > > For apache-beam-samples/run_inference, I am able to read but not write, getting the error below: User [[ziqima20@gmail.com](mailto:ziqima20@gmail.com)] does not have permission to access b instance [apache-beam-samples] (or it may not exist): [ziqima20@gmail.com](mailto:ziqima20@gmail.com) does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).
   > > However, it seems like the other tests use gs://apache-beam-ml/models/ and gs://apache-beam-ml/datasets. For these I do not have read access.
   > 
   > Yes that is true. You can share the files with me and I can upload them to the GCS bucket for you. You can share them to me at [anandinguva98@gmail.com](mailto:anandinguva98@gmail.com)
   
   Hi @AnandInguva - This PR is ready on my end - I added all the gap-related tests using my own gcp storage. Could you help copy the files below to the beam gcp buckets? (My bucket is open to public)
   
   gs://ziqi-bucket1/torch_2xplus5_onnx   -> gs://apache-beam-ml/models/torch_2xplus5_onnx
   gs://ziqi-bucket1/tf_2xplus5_onnx          -> gs://apache-beam-ml/models/tf_2xplus5_onnx
   gs://ziqi-bucket1/skl_2xplus5_onnx        -> gs://apache-beam-ml/models/skl_2xplus5_onnx
   
   gs://ziqi-bucket1/sentiment_classification_input.txt   -> gs://apache-beam-ml/testing/inputs/onnx/sentiment_classification_input.txt
   gs://ziqi-bucket1/roberta_sentiment_classification.onnx  -> gs://apache-beam-ml/models/roberta_sentiment_classification.onnx
   gs://ziqi-bucket1/sentiment_classification_expected_output.txt -> gs://apache-beam-ml/testing/expected_outputs/test_onnx_run_inference_roberta_sentiment_classification_actuals.txt
   
   Thanks!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1424744359

   LGTM - we can merge once Ritesh's comments are responded to


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1374404258

   > One question: Do we need GPUs to run the ONNX unit tests? I have very little about ONNX but I will have a read about it during the review.
   
   In the code I did specify ONNX provider in a way (as a priority list) such that if run on NVIDIA GPUs they should run the gpu version but on cpu it should run the cpu version. But I ran my tests on CPU - if we want to make sure this works properly in a GPU environment (and which type of GPU also matters) then I need to test in those environments too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1092162148


##########
sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform sentiment classification
+using RoBERTa.
+
+This pipeline takes sentences from a custom text file, and then uses RoBERTa
+from Hugging Face to predict the sentiment of a given review. The pipeline
+then writes the prediction to an output file in which users can then compare against true labels.
+
+Model is fine-tuned RoBERTa from
+https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb # pylint: disable=line-too-long
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Iterator
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+import numpy as np
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import RobertaTokenizer
+
+
+def tokenize_sentence(text: str,
+                      tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]:
+  tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag.
+  # TODO(https://github.com/apache/beam/issues/21863): Remove once optional
+  # batching flag added
+  return text, torch.tensor(tokenized_sentence).numpy()
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = np.argmax(prediction_result.inference, axis=0)
+    yield filename + ';' + str(prediction)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      help='Path to the text file containing sentences.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path of file in which to save the output predictions.')
+  parser.add_argument(
+      '--model_uri',
+      dest='model_uri',
+      required=True,
+      help="Path to the model's uri.")
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class OnnxNoBatchModelHandler(OnnxModelHandlerNumpy):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from RobertaTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = OnnxNoBatchModelHandler(model_uri=known_args.model_uri)
+
+  pipeline = test_pipeline
+  if not test_pipeline:
+    pipeline = beam.Pipeline(options=pipeline_options)
+
+  tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
+
+  if not known_args.input:
+    text = (pipeline | 'CreateSentences' >> beam.Create([
+      'A comedy-drama of nearly epic proportions rooted in a sincere performance by the title character undergoing midlife crisis .', # pylint: disable=line-too-long
+      'There \'s little to recommend Snow Dogs , unless one considers cliched dialogue and perverse escapism a source of high hilarity .', # pylint: disable=line-too-long
+      'It is a terrible movie .',
+      'A welcome relief from baseball movies that try too hard to be mythic , this one is a sweet and modest and ultimately winning story .', # pylint: disable=line-too-long
+      'It almost feels as if the movie is more interested in entertaining itself than in amusing us .', # pylint: disable=line-too-long
+      'Cliche. Not worth watching .',
+      'I \'m sure the filmmaker would disagree , but , honestly , I don\'t see the point .', # pylint: disable=line-too-long
+      'Such a waste of time .',
+      'There is no storyline .',
+      'A very funny romantic comedy .',
+    ]))
+  else:
+    text = (
+        pipeline | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+  text_and_tokenized_text_tuple = (
+      text
+      | 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines)
+      |
+      'TokenizeSentence' >> beam.Map(lambda x: tokenize_sentence(x, tokenizer)))
+  output = (
+      text_and_tokenized_text_tuple
+      | 'PyTorchRunInference' >> RunInference(KeyedModelHandler(model_handler))
+      | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+  output | "WriteOutput" >> beam.io.WriteToText( # pylint: disable=expression-not-assigned

Review Comment:
   Can we try to decrease/solve pylint errors instead of ignoring them?



##########
sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform sentiment classification
+using RoBERTa.
+
+This pipeline takes sentences from a custom text file, and then uses RoBERTa
+from Hugging Face to predict the sentiment of a given review. The pipeline
+then writes the prediction to an output file in which users can then compare against true labels.
+
+Model is fine-tuned RoBERTa from
+https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb # pylint: disable=line-too-long
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Iterator
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+import numpy as np
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import RobertaTokenizer
+
+
+def tokenize_sentence(text: str,
+                      tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]:
+  tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag.
+  # TODO(https://github.com/apache/beam/issues/21863): Remove once optional
+  # batching flag added
+  return text, torch.tensor(tokenized_sentence).numpy()
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = np.argmax(prediction_result.inference, axis=0)
+    yield filename + ';' + str(prediction)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      help='Path to the text file containing sentences.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path of file in which to save the output predictions.')
+  parser.add_argument(
+      '--model_uri',
+      dest='model_uri',
+      required=True,
+      help="Path to the model's uri.")
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class OnnxNoBatchModelHandler(OnnxModelHandlerNumpy):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from RobertaTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = OnnxNoBatchModelHandler(model_uri=known_args.model_uri)
+
+  pipeline = test_pipeline
+  if not test_pipeline:
+    pipeline = beam.Pipeline(options=pipeline_options)
+
+  tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
+
+  if not known_args.input:
+    text = (pipeline | 'CreateSentences' >> beam.Create([
+      'A comedy-drama of nearly epic proportions rooted in a sincere performance by the title character undergoing midlife crisis .', # pylint: disable=line-too-long
+      'There \'s little to recommend Snow Dogs , unless one considers cliched dialogue and perverse escapism a source of high hilarity .', # pylint: disable=line-too-long
+      'It is a terrible movie .',
+      'A welcome relief from baseball movies that try too hard to be mythic , this one is a sweet and modest and ultimately winning story .', # pylint: disable=line-too-long
+      'It almost feels as if the movie is more interested in entertaining itself than in amusing us .', # pylint: disable=line-too-long
+      'Cliche. Not worth watching .',
+      'I \'m sure the filmmaker would disagree , but , honestly , I don\'t see the point .', # pylint: disable=line-too-long
+      'Such a waste of time .',
+      'There is no storyline .',
+      'A very funny romantic comedy .',
+    ]))
+  else:
+    text = (
+        pipeline | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+  text_and_tokenized_text_tuple = (
+      text
+      | 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines)
+      |
+      'TokenizeSentence' >> beam.Map(lambda x: tokenize_sentence(x, tokenizer)))
+  output = (
+      text_and_tokenized_text_tuple
+      | 'PyTorchRunInference' >> RunInference(KeyedModelHandler(model_handler))
+      | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+  output | "WriteOutput" >> beam.io.WriteToText( # pylint: disable=expression-not-assigned

Review Comment:
   ```suggestion
     _  = output | "WriteOutput" >> beam.io.WriteToText(
   ```



##########
sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform sentiment classification
+using RoBERTa.
+
+This pipeline takes sentences from a custom text file, and then uses RoBERTa
+from Hugging Face to predict the sentiment of a given review. The pipeline
+then writes the prediction to an output file in which users can then compare against true labels.
+
+Model is fine-tuned RoBERTa from
+https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb # pylint: disable=line-too-long
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Iterator
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+import numpy as np
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import RobertaTokenizer
+
+
+def tokenize_sentence(text: str,
+                      tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]:
+  tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag.
+  # TODO(https://github.com/apache/beam/issues/21863): Remove once optional
+  # batching flag added
+  return text, torch.tensor(tokenized_sentence).numpy()
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = np.argmax(prediction_result.inference, axis=0)
+    yield filename + ';' + str(prediction)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      help='Path to the text file containing sentences.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path of file in which to save the output predictions.')
+  parser.add_argument(
+      '--model_uri',
+      dest='model_uri',
+      required=True,
+      help="Path to the model's uri.")
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class OnnxNoBatchModelHandler(OnnxModelHandlerNumpy):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from RobertaTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = OnnxNoBatchModelHandler(model_uri=known_args.model_uri)
+
+  pipeline = test_pipeline
+  if not test_pipeline:
+    pipeline = beam.Pipeline(options=pipeline_options)
+
+  tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
+
+  if not known_args.input:

Review Comment:
   Can we make sure there `input` is required arg and remove this part of code? Reduces complexity.



##########
sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform sentiment classification
+using RoBERTa.
+
+This pipeline takes sentences from a custom text file, and then uses RoBERTa
+from Hugging Face to predict the sentiment of a given review. The pipeline
+then writes the prediction to an output file in which users can then compare against true labels.
+
+Model is fine-tuned RoBERTa from
+https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb # pylint: disable=line-too-long
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Iterator
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+import numpy as np
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import RobertaTokenizer
+
+
+def tokenize_sentence(text: str,
+                      tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]:
+  tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag.
+  # TODO(https://github.com/apache/beam/issues/21863): Remove once optional
+  # batching flag added
+  return text, torch.tensor(tokenized_sentence).numpy()
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = np.argmax(prediction_result.inference, axis=0)
+    yield filename + ';' + str(prediction)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      help='Path to the text file containing sentences.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path of file in which to save the output predictions.')
+  parser.add_argument(
+      '--model_uri',
+      dest='model_uri',
+      required=True,
+      help="Path to the model's uri.")
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class OnnxNoBatchModelHandler(OnnxModelHandlerNumpy):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.

Review Comment:
   ```suggestion
       """Wrapper to OnnxModelHandlerNumpy to limit batch size to 1.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #24911: Ziqima/onnx

Posted by "jrmccluskey (via GitHub)" <gi...@apache.org>.
jrmccluskey commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1089085848


##########
sdks/python/tox.ini:
##########
@@ -326,3 +326,20 @@ commands =
   # Run all PyTorch unit tests
   # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories.
   /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'
+
+[testenv:py{37,38,39,310}-onnx]
+deps =
+  onnxruntime>=1.13.1,<=1.13.1

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1425265328

   > Since you've already added an example, please add a section to [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/README.md) on how to run it. You can add it in a new PR as well if you'd like.
   
   Added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1095007404


##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,470 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from apache_beam.ml.inference.base import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.onnx_inference import default_numpy_inference_fn
+  from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+
+
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5 for example in self.get_one_feature_samples()])
+    ]
+
+  def get_two_feature_examples(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([3, 10], dtype="float32"),
+        np.array([-14, 0], dtype="float32"),
+        np.array([0.5, 0.5], dtype="float32")
+    ]
+
+  def get_two_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_two_feature_examples(),
+            [
+                f1 * 2.0 + f2 * 3 + 0.5 for f1,
+                f2 in self.get_two_feature_examples()
+            ])
+    ]
+
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_one_feature_model(self):
+    params = [
+        np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")
+    ]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_one_feature_model(self):
+    x = [[0], [1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_two_feature_model(self):
+    x = [[1, 5], [3, 2], [1, 0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+
+
+def _to_numpy(tensor):
+  return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu(
+  ).numpy()
+
+
+class TestOnnxModelHandler(OnnxModelHandlerNumpy):
+  def __init__( #pylint: disable=dangerous-default-value
+      self,
+      model_uri: str,
+      session_options=None,
+      providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
+      provider_options=None,
+      *,
+      inference_fn=default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._session_options = session_options
+    self._providers = providers
+    self._provider_options = provider_options
+    self._model_inference_fn = inference_fn
+
+
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferenceTest(OnnxTestBase):
+  def test_onnx_pytorch_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+
+    model = self.test_data_and_model.get_torch_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+    dummy_input = torch.randn(4, 1, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant-
+                                                # folding for optimization
+                      input_names = ['input'],   # model's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+  def test_num_bytes(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    batched_examples_int = [
+        np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        batched_examples_int[0].itemsize * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+
+    batched_examples_float = [
+        np.array([1, 5], dtype=np.float32),
+        np.array([3, 10], dtype=np.float32),
+        np.array([-14, 0], dtype=np.float32),
+        np.array([0.5, 0.5], dtype=np.float32)
+    ]
+    self.assertEqual(
+        batched_examples_float[0].itemsize * 4,
+        inference_runner.get_num_bytes(batched_examples_float))
+
+  def test_namespace(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    self.assertEqual('BeamML_Onnx', inference_runner.get_metrics_namespace())
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferenceTest(OnnxTestBase):
+  def test_onnx_tensorflow_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_tf_one_feature_model()
+
+    path = os.path.join(self.tmpdir, 'my_onnx_tf_path')
+    spec = (tf.TensorSpec((None, 1), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(linear_model,
+    input_signature=spec,
+    opset=13,
+    output_path=path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferenceTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_onnx_sklearn_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_sklearn_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+    self.save_model(linear_model, 1, path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    dummy_input = torch.randn(4, 2, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant
+                                                # folding for optimization
+                      input_names = ['input'],   # odel's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+      model = self.test_data_and_model.get_torch_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/torch_2xplus5_onnx'
+      # first need to download model from remote
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+        model = self.test_data_and_model.get_torch_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    spec = (tf.TensorSpec((None, 2), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(model,
+    input_signature=spec, opset=13, output_path=path)
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+      model = self.test_data_and_model.get_tf_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/tf_2xplus5_onnx'
+
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+        model = self.test_data_and_model.get_tf_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferencePipelineTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+      model = self.test_data_and_model.get_sklearn_two_feature_model()
+      self.save_model(model, 2, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model \

Review Comment:
   Can we wrap/break the long line using parentheses instead of back slash? 
   For reference: From PEP 8 style guide: https://peps.python.org/pep-0008/#maximum-line-length
   
   ```
   
   The preferred way of wrapping long lines is by using Python’s implied line continuation 
   
   inside parentheses, brackets and braces. Long lines can be broken over multiple lines by 
   
   wrapping expressions in parentheses. These should be used in preference to using a 
   
   backslash for line continuation.
   ```
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1095007404


##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,470 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from apache_beam.ml.inference.base import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.onnx_inference import default_numpy_inference_fn
+  from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+
+
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5 for example in self.get_one_feature_samples()])
+    ]
+
+  def get_two_feature_examples(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([3, 10], dtype="float32"),
+        np.array([-14, 0], dtype="float32"),
+        np.array([0.5, 0.5], dtype="float32")
+    ]
+
+  def get_two_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_two_feature_examples(),
+            [
+                f1 * 2.0 + f2 * 3 + 0.5 for f1,
+                f2 in self.get_two_feature_examples()
+            ])
+    ]
+
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_one_feature_model(self):
+    params = [
+        np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")
+    ]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_one_feature_model(self):
+    x = [[0], [1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_two_feature_model(self):
+    x = [[1, 5], [3, 2], [1, 0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+
+
+def _to_numpy(tensor):
+  return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu(
+  ).numpy()
+
+
+class TestOnnxModelHandler(OnnxModelHandlerNumpy):
+  def __init__( #pylint: disable=dangerous-default-value
+      self,
+      model_uri: str,
+      session_options=None,
+      providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
+      provider_options=None,
+      *,
+      inference_fn=default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._session_options = session_options
+    self._providers = providers
+    self._provider_options = provider_options
+    self._model_inference_fn = inference_fn
+
+
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferenceTest(OnnxTestBase):
+  def test_onnx_pytorch_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+
+    model = self.test_data_and_model.get_torch_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+    dummy_input = torch.randn(4, 1, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant-
+                                                # folding for optimization
+                      input_names = ['input'],   # model's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+  def test_num_bytes(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    batched_examples_int = [
+        np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        batched_examples_int[0].itemsize * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+
+    batched_examples_float = [
+        np.array([1, 5], dtype=np.float32),
+        np.array([3, 10], dtype=np.float32),
+        np.array([-14, 0], dtype=np.float32),
+        np.array([0.5, 0.5], dtype=np.float32)
+    ]
+    self.assertEqual(
+        batched_examples_float[0].itemsize * 4,
+        inference_runner.get_num_bytes(batched_examples_float))
+
+  def test_namespace(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    self.assertEqual('BeamML_Onnx', inference_runner.get_metrics_namespace())
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferenceTest(OnnxTestBase):
+  def test_onnx_tensorflow_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_tf_one_feature_model()
+
+    path = os.path.join(self.tmpdir, 'my_onnx_tf_path')
+    spec = (tf.TensorSpec((None, 1), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(linear_model,
+    input_signature=spec,
+    opset=13,
+    output_path=path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferenceTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_onnx_sklearn_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_sklearn_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+    self.save_model(linear_model, 1, path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    dummy_input = torch.randn(4, 2, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant
+                                                # folding for optimization
+                      input_names = ['input'],   # odel's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+      model = self.test_data_and_model.get_torch_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/torch_2xplus5_onnx'
+      # first need to download model from remote
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+        model = self.test_data_and_model.get_torch_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    spec = (tf.TensorSpec((None, 2), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(model,
+    input_signature=spec, opset=13, output_path=path)
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+      model = self.test_data_and_model.get_tf_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/tf_2xplus5_onnx'
+
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+        model = self.test_data_and_model.get_tf_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferencePipelineTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+      model = self.test_data_and_model.get_sklearn_two_feature_model()
+      self.save_model(model, 2, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model \

Review Comment:
   Can we wrap the long line using parentheses instead of back slash? 
   For reference: From PEP 8 style guide: https://peps.python.org/pep-0008/#maximum-line-length
   
   ```
   
   The preferred way of wrapping long lines is by using Python’s implied line continuation 
   
   inside parentheses, brackets and braces. Long lines can be broken over multiple lines by 
   
   wrapping expressions in parentheses. These should be used in preference to using a 
   
   backslash for line continuation.
   ```
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1416686395

   > Most of this looks good to me.
   > 
   > I would clean up the lint, formatting errors for the checks to go green and we can start finalizing the PR.
   
   Hi -thanks. I fixed the formatting errors but seems like the 2 remaining failures are about files/tests that should not be touched by this PR?
   
   - codecov/patch is complaining 0.00% diff hit (and it seems to be one line not covered in sdks/python/apache_beam/runners/worker/bundle_processor.py? (I'm not sure how to cover it?)
   - py310 test is failing due to a segfault, but I only added onnx tests for py38?
   
   Would be great if you could give some pointers regarding these, thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1425779025

   (above comment was for the CI bot) - I'm going to let the precommit checks run to completion to make sure they are passing, then I will merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1425778476

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1373049571

   # [Codecov](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#24911](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f0e21fb) into [master](https://codecov.io/gh/apache/beam/commit/177306792ce84d7a1e95cf6f4630b940c7435cca?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1773067) will **decrease** coverage by `0.04%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #24911      +/-   ##
   ==========================================
   - Coverage   73.13%   73.09%   -0.05%     
   ==========================================
     Files         731      732       +1     
     Lines       97933    97988      +55     
   ==========================================
     Hits        71620    71620              
   - Misses      24959    25014      +55     
     Partials     1354     1354              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `82.78% <0.00%> (-0.07%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../python/apache\_beam/ml/inference/onnx\_inference.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL29ubnhfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <0.00%> (-0.13%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `73.29% <0.00%> (-0.05%)` | :arrow_down: |
   | [...beam/testing/load\_tests/load\_test\_metrics\_utils.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9sb2FkX3Rlc3RzL2xvYWRfdGVzdF9tZXRyaWNzX3V0aWxzLnB5) | `33.89% <0.00%> (-0.03%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/24911?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `89.86% <0.00%> (+1.38%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1399342547

   > > > Hi, once the PR is ready for another review, comment on this with `PTAL @<username>`.
   > > > Thanks for the starting this.
   > > 
   > > 
   > > PTAL @AnandInguva [I'm still working on the gradle onnxtest (if that is the right direction to go), but in the meantime could you let me know how to put data at gcp location for integration test? I think maybe that should come before the gradle part.]
   > 
   > Hi, `gs://apache-beam-samples/run_inference` of project `apache-beam-testing` should be open to public. You can upload your required files/folder here for testing.
   
   Yes I am able to access apache-beam-samples/run_inference. However, it seems like the other tests use gs://apache-beam-ml/models/ or gs://apache-beam-ml/datasets, but I'm getting access denied. Is there a way to get access? Or should I just put the test data/models in examples?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1373105699

   One question: Do we need GPUs to run the ONNX unit tests? I have very little about ONNX but I will have a read about it during the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1072231351


##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,488 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import sys
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:

Review Comment:
   For the tests to run, we can have a tox command like https://github.com/apache/beam/blob/9f7c55f40d3d5bba8b5d1a428f6ff42ef8082bbf/sdks/python/tox.ini#L317 which could install all the dependencies required to run this file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on pull request #24911: Ziqima/onnx

Posted by "jrmccluskey (via GitHub)" <gi...@apache.org>.
jrmccluskey commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1406649903

   Run PythonDocs PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1424739340

   Since you've already added an example, please add a section to [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/README.md) on how to run it. You can add it in a new PR as well if you'd like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1400646911

   > > > > Hi, once the PR is ready for another review, comment on this with `PTAL @<username>`.
   > > > > Thanks for the starting this.
   > > > 
   > > > 
   > > > PTAL @AnandInguva [I'm still working on the gradle onnxtest (if that is the right direction to go), but in the meantime could you let me know how to put data at gcp location for integration test? I think maybe that should come before the gradle part.]
   > > 
   > > 
   > > Hi, `gs://apache-beam-samples/run_inference` of project `apache-beam-testing` should be open to public. You can upload your required files/folder here for testing.
   > 
   > For apache-beam-samples/run_inference, I am able to read but not write, getting the error below: User [[ziqima20@gmail.com](mailto:ziqima20@gmail.com)] does not have permission to access b instance [apache-beam-samples] (or it may not exist): [ziqima20@gmail.com](mailto:ziqima20@gmail.com) does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).
   > 
   > However, it seems like the other tests use gs://apache-beam-ml/models/ and gs://apache-beam-ml/datasets. For these I do not have read access.
   
   Yes that is true. You can share the files with me and I can upload them to the GCS bucket for you. You can share them to me at anandinguva98@gmail.com


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1428128649

   It breaks Python PostCommits `hdfsIntegrationTest` task, error message is (taken from https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Python310/470/consoleFull)
   
   ```
   04:11:40 test_1      | Traceback (most recent call last):
   04:11:40 test_1      |   File "/usr/local/bin/tox", line 8, in <module>
   04:11:40 test_1      |     sys.exit(cmdline())
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/session/__init__.py", line 42, in cmdline
   04:11:40 test_1      |     main(args)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/session/__init__.py", line 62, in main
   04:11:40 test_1      |     config = load_config(args)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/session/__init__.py", line 78, in load_config
   04:11:40 test_1      |     config = parseconfig(args)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 262, in parseconfig
   04:11:40 test_1      |     ParseIni(config, config_file, content)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1126, in __init__
   04:11:40 test_1      |     raise tox.exception.ConfigError(
   04:11:40 test_1      | tox.exception.ConfigError: ConfigError: py{38}-onnx-{113} failed with ConfigError: substitution key '38' not found at Traceback (most recent call last):
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1102, in run
   04:11:40 test_1      |     results[name] = cur_self.make_envconfig(name, section, subs, config)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1226, in make_envconfig
   04:11:40 test_1      |     res = meth(env_attr.name, env_attr.default, replace=replace)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1407, in getpath
   04:11:40 test_1      |     path = self.getstring(name, defaultpath, replace=replace)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1486, in getstring
   04:11:40 test_1      |     x = self._replace_if_needed(x, name, replace, crossonly)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1491, in _replace_if_needed
   04:11:40 test_1      |     x = self._replace(x, name=name, crossonly=crossonly)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1517, in _replace
   04:11:40 test_1      |     replaced = Replacer(self, crossonly=crossonly).do_replace(value)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1557, in do_replace
   04:11:40 test_1      |     expanded = substitute_once(value)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1551, in substitute_once
   04:11:40 test_1      |     return self.RE_ITEM_REF.sub(self._replace_match, x)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1598, in _replace_match
   04:11:40 test_1      |     return self._replace_substitution(match)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1633, in _replace_substitution
   04:11:40 test_1      |     val = self._substitute_from_other_section(sub_key)
   04:11:40 test_1      |   File "/usr/local/lib/python3.10/site-packages/tox/config/__init__.py", line 1627, in _substitute_from_other_section
   04:11:40 test_1      |     raise tox.exception.ConfigError("substitution key {!r} not found".format(key))
   04:11:40 test_1      | tox.exception.ConfigError: ConfigError: substitution key '38' not found
   04:11:40 test_1      | 
   ```
   see #25443 for details


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1095007404


##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,470 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from apache_beam.ml.inference.base import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.onnx_inference import default_numpy_inference_fn
+  from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+
+
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5 for example in self.get_one_feature_samples()])
+    ]
+
+  def get_two_feature_examples(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([3, 10], dtype="float32"),
+        np.array([-14, 0], dtype="float32"),
+        np.array([0.5, 0.5], dtype="float32")
+    ]
+
+  def get_two_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_two_feature_examples(),
+            [
+                f1 * 2.0 + f2 * 3 + 0.5 for f1,
+                f2 in self.get_two_feature_examples()
+            ])
+    ]
+
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_one_feature_model(self):
+    params = [
+        np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")
+    ]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_one_feature_model(self):
+    x = [[0], [1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_two_feature_model(self):
+    x = [[1, 5], [3, 2], [1, 0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+
+
+def _to_numpy(tensor):
+  return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu(
+  ).numpy()
+
+
+class TestOnnxModelHandler(OnnxModelHandlerNumpy):
+  def __init__( #pylint: disable=dangerous-default-value
+      self,
+      model_uri: str,
+      session_options=None,
+      providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
+      provider_options=None,
+      *,
+      inference_fn=default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._session_options = session_options
+    self._providers = providers
+    self._provider_options = provider_options
+    self._model_inference_fn = inference_fn
+
+
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferenceTest(OnnxTestBase):
+  def test_onnx_pytorch_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+
+    model = self.test_data_and_model.get_torch_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+    dummy_input = torch.randn(4, 1, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant-
+                                                # folding for optimization
+                      input_names = ['input'],   # model's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+  def test_num_bytes(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    batched_examples_int = [
+        np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        batched_examples_int[0].itemsize * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+
+    batched_examples_float = [
+        np.array([1, 5], dtype=np.float32),
+        np.array([3, 10], dtype=np.float32),
+        np.array([-14, 0], dtype=np.float32),
+        np.array([0.5, 0.5], dtype=np.float32)
+    ]
+    self.assertEqual(
+        batched_examples_float[0].itemsize * 4,
+        inference_runner.get_num_bytes(batched_examples_float))
+
+  def test_namespace(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    self.assertEqual('BeamML_Onnx', inference_runner.get_metrics_namespace())
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferenceTest(OnnxTestBase):
+  def test_onnx_tensorflow_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_tf_one_feature_model()
+
+    path = os.path.join(self.tmpdir, 'my_onnx_tf_path')
+    spec = (tf.TensorSpec((None, 1), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(linear_model,
+    input_signature=spec,
+    opset=13,
+    output_path=path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferenceTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_onnx_sklearn_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_sklearn_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+    self.save_model(linear_model, 1, path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    dummy_input = torch.randn(4, 2, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant
+                                                # folding for optimization
+                      input_names = ['input'],   # odel's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+      model = self.test_data_and_model.get_torch_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/torch_2xplus5_onnx'
+      # first need to download model from remote
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+        model = self.test_data_and_model.get_torch_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    spec = (tf.TensorSpec((None, 2), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(model,
+    input_signature=spec, opset=13, output_path=path)
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+      model = self.test_data_and_model.get_tf_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/tf_2xplus5_onnx'
+
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+        model = self.test_data_and_model.get_tf_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferencePipelineTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+      model = self.test_data_and_model.get_sklearn_two_feature_model()
+      self.save_model(model, 2, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model \

Review Comment:
   Can we wrap the long line using parentheses instead of back slash? 
   For reference: From PEP 8, style guild https://peps.python.org/pep-0008/#maximum-line-length
   
   ```
   
   The preferred way of wrapping long lines is by using Python’s implied line continuation 
   
   inside parentheses, brackets and braces. Long lines can be broken over multiple lines by 
   
   wrapping expressions in parentheses. These should be used in preference to using a 
   
   backslash for line continuation.
   ```
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1072234910


##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandlerNumpy'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs, inference_args)

Review Comment:
   I am not sure if passing inference_args like this is the right way. 
   
   Let us assume model predict call accepts 
   ```
   def predict(inputs, dropout=True):
      .... 
      # some processing on inputs, calculate output_1, output_2
      if dropout:
            return output_1
      else:
            return output_2
   ```
   We pass the dropout argument via inference_args: `inference_args = {'dropout`: True}. How would we pass this extra input to the `inference_session` ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1399342059

   > Hi, I will take another review this weekend. If you have some time, you can fix the formatting issues.
   > 
   > ![image](https://user-images.githubusercontent.com/34158215/213797292-459f363b-7d8a-4378-8b67-24f265fea6f6.png)
   
   Fixed yaps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1096548743


##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,149 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+
+import onnx
+import onnxruntime as ort
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+
+__all__ = ['OnnxModelHandlerNumpy']
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _convert_to_result(

Review Comment:
   #25200 moved this method to apache_beam/ml/inference/utils.py. Can you refactor the change to it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm merged pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #24911:
URL: https://github.com/apache/beam/pull/24911


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma closed pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma closed pull request #24911: Ziqima/onnx
URL: https://github.com/apache/beam/pull/24911


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1424724929

   @ziqi-ma Hi, is it ready for the final review? 
   
   I am not a committer so passing this @jrmccluskey @damccorm @riteshghorse @tvalentyn 
   
   Thanks again for the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1101952849


##########
sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py:
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform sentiment classification
+using RoBERTa.
+
+This pipeline takes sentences from a custom text file, and then uses RoBERTa
+from Hugging Face to predict the sentiment of a given review. The pipeline
+then writes the prediction to an output file in which users can then compare against true labels.
+
+Model is fine-tuned RoBERTa from
+https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb # pylint: disable=line-too-long
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Iterator
+from typing import Tuple
+
+import numpy as np
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import RobertaTokenizer
+
+
+def tokenize_sentence(text: str,
+                      tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]:
+  tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag.
+  # TODO(https://github.com/apache/beam/issues/21863): Remove once optional
+  # batching flag added
+  return text, torch.tensor(tokenized_sentence).numpy()
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = np.argmax(prediction_result.inference, axis=0)
+    yield filename + ';' + str(prediction)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      help='Path to the text file containing sentences.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path of file in which to save the output predictions.')
+  parser.add_argument(
+      '--model_uri',
+      dest='model_uri',
+      required=True,
+      help="Path to the model's uri.")
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class OnnxNoBatchModelHandler(OnnxModelHandlerNumpy):
+    """Wrapper to OnnxModelHandlerNumpy to limit batch size to 1.
+
+    The tokenized strings generated from RobertaTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}

Review Comment:
   I added this as an option to our other Model Handlers in #25370 (and #25398 as a quick follow up) - if you'd like to pick that up as a follow up PR following the same pattern, let me know, if not I will probably put up a quick PR to add it once this is merged.
   
   I don't think that should block merging this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on a diff in pull request #24911: Support ONNX runtime in RunInference API

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1101954405


##########
sdks/python/tox.ini:
##########
@@ -329,3 +331,20 @@ commands =
   # Run all PyTorch unit tests
   # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories.
   /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'
+
+[testenv:py{38}-onnx-{113}]
+deps =
+  onnxruntime==1.13.1
+  pandas==1.5.2
+  torch==1.13.1
+  tensorflow==2.11.0
+  tf2onnx==1.13.0
+  skl2onnx==1.13
+  transformers==4.25.1
+extras = test,gcp
+commands =
+  # Log onnx version for debugging
+  /bin/sh -c "pip freeze | grep -E onnx"
+  # Run all PyTorch unit tests

Review Comment:
   ```suggestion
     # Run all ONNX unit tests
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1095007404


##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,470 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from apache_beam.ml.inference.base import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.onnx_inference import default_numpy_inference_fn
+  from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+
+
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5 for example in self.get_one_feature_samples()])
+    ]
+
+  def get_two_feature_examples(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([3, 10], dtype="float32"),
+        np.array([-14, 0], dtype="float32"),
+        np.array([0.5, 0.5], dtype="float32")
+    ]
+
+  def get_two_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_two_feature_examples(),
+            [
+                f1 * 2.0 + f2 * 3 + 0.5 for f1,
+                f2 in self.get_two_feature_examples()
+            ])
+    ]
+
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_one_feature_model(self):
+    params = [
+        np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")
+    ]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_one_feature_model(self):
+    x = [[0], [1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_two_feature_model(self):
+    x = [[1, 5], [3, 2], [1, 0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+
+
+def _to_numpy(tensor):
+  return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu(
+  ).numpy()
+
+
+class TestOnnxModelHandler(OnnxModelHandlerNumpy):
+  def __init__( #pylint: disable=dangerous-default-value
+      self,
+      model_uri: str,
+      session_options=None,
+      providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
+      provider_options=None,
+      *,
+      inference_fn=default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._session_options = session_options
+    self._providers = providers
+    self._provider_options = provider_options
+    self._model_inference_fn = inference_fn
+
+
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferenceTest(OnnxTestBase):
+  def test_onnx_pytorch_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+
+    model = self.test_data_and_model.get_torch_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+    dummy_input = torch.randn(4, 1, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant-
+                                                # folding for optimization
+                      input_names = ['input'],   # model's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+  def test_num_bytes(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    batched_examples_int = [
+        np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        batched_examples_int[0].itemsize * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+
+    batched_examples_float = [
+        np.array([1, 5], dtype=np.float32),
+        np.array([3, 10], dtype=np.float32),
+        np.array([-14, 0], dtype=np.float32),
+        np.array([0.5, 0.5], dtype=np.float32)
+    ]
+    self.assertEqual(
+        batched_examples_float[0].itemsize * 4,
+        inference_runner.get_num_bytes(batched_examples_float))
+
+  def test_namespace(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    self.assertEqual('BeamML_Onnx', inference_runner.get_metrics_namespace())
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferenceTest(OnnxTestBase):
+  def test_onnx_tensorflow_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_tf_one_feature_model()
+
+    path = os.path.join(self.tmpdir, 'my_onnx_tf_path')
+    spec = (tf.TensorSpec((None, 1), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(linear_model,
+    input_signature=spec,
+    opset=13,
+    output_path=path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferenceTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_onnx_sklearn_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_sklearn_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+    self.save_model(linear_model, 1, path)
+
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+
+
+@pytest.mark.uses_onnx
+class OnnxPytorchRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    dummy_input = torch.randn(4, 2, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant
+                                                # folding for optimization
+                      input_names = ['input'],   # odel's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+      model = self.test_data_and_model.get_torch_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/torch_2xplus5_onnx'
+      # first need to download model from remote
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+        model = self.test_data_and_model.get_torch_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxTensorflowRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    spec = (tf.TensorSpec((None, 2), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(model,
+    input_signature=spec, opset=13, output_path=path)
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+      model = self.test_data_and_model.get_tf_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/tf_2xplus5_onnx'
+
+      model_handler = TestOnnxModelHandler(gs_path)
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+        model = self.test_data_and_model.get_tf_two_feature_model()
+        self.exportModelToOnnx(model, path)
+
+        model_handler = TestOnnxModelHandler(path)
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+
+
+@pytest.mark.uses_onnx
+class OnnxSklearnRunInferencePipelineTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+      model = self.test_data_and_model.get_sklearn_two_feature_model()
+      self.save_model(model, 2, path)
+      model_handler = TestOnnxModelHandler(path)
+
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model \

Review Comment:
   Can we wrap the long line using parentheses instead of back slash? 
   
   ```
   For reference: From PEP 8, style guild https://peps.python.org/pep-0008/#maximum-line-length
   
   The preferred way of wrapping long lines is by using Python’s implied line continuation inside parentheses, brackets and braces. Long lines can be broken over multiple lines by wrapping expressions in parentheses. These should be used in preference to using a backslash for line continuation.
   ```
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1385443127

   > > Hi, once the PR is ready for another review, comment on this with `PTAL @<username>`.
   > > Thanks for the starting this.
   > 
   > PTAL @AnandInguva [I'm still working on the gradle onnxtest (if that is the right direction to go), but in the meantime could you let me know how to put data at gcp location for integration test? I think maybe that should come before the gradle part.]
   
   Hi, `gs://apache-beam-samples/run_inference` should be open to public. You can upload your required files/folder here for testing. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1398881315

   Hi, I will take another review this weekend.  If you have some time, you can fix the formatting issues. 
   
   ![image](https://user-images.githubusercontent.com/34158215/213797292-459f363b-7d8a-4378-8b67-24f265fea6f6.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1399342200

   > > Hi, I will take another review this weekend. If you have some time, you can fix the formatting issues.
   > > ![image](https://user-images.githubusercontent.com/34158215/213797292-459f363b-7d8a-4378-8b67-24f265fea6f6.png)
   > 
   > Fixed yaps
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "ziqi-ma (via GitHub)" <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1425266392

   > @ziqi-ma Hi, is it ready for the final review?
   > 
   > I confirm the tests are running for onnx. link: https://ci-beam.apache.org/job/beam_PreCommit_Python_Coverage_Commit/398/testReport/apache_beam.ml.inference.onnx_inference_test/ I am not a committer so passing this @jrmccluskey @damccorm @riteshghorse @tvalentyn
   > 
   > Thanks again for the changes.
   > 
   > Also, can you add this feature to the CHANGES.md defined at https://github.com/apache/beam/blob/master/CHANGES.md?
   > 
   > Something like
   > 
   > https://github.com/apache/beam/blob/64e40d2c018f8e906f4bec32ef67f02734a95721/CHANGES.md?plain=1#L57
   > 
   > I would like to get this before Feb 22nd so that it can be included in next release 2.46.0
   
   Hi - I have fixed all comments. Think this is ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #24911: Support ONNX runtime in RunInference API

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1426243602

   Thanks @ziqi-ma!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1377524823

   Hi, once the PR is ready for another review, ping me with `PTAL @<username>`. 
   Thanks for the starting this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1064076443


##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)
+  return ort_outs
+
+
+class OnnxModelHandler(ModelHandler[numpy.ndarray,

Review Comment:
   Are there any other inputs ONNX model handler could accept potentially?



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental

Review Comment:
   I don't think we use this anywhere. Do you want to mark this experimental? If yes, why?



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)
+  return ort_outs
+
+
+class OnnxModelHandler(ModelHandler[numpy.ndarray,
+                                    PredictionResult,
+                                    ort.InferenceSession]):
+  def __init__(
+      self,
+      model_uri: str,
+      *,
+      inference_fn: NumpyInferenceFn = default_numpy_inference_fn):
+    """ Implementation of the ModelHandler interface for onnx
+    using numpy arrays as input.
+
+    Example Usage::
+
+      pcoll | RunInference(OnnxModelHandler(model_uri="my_uri"))
+
+    Args:
+      model_uri: The URI to where the model is saved.
+      inference_fn: The inference function to use.
+        default=default_numpy_inference_fn
+    """
+    self._model_uri = model_uri
+    self._model_inference_fn = inference_fn
+
+  def load_model(self) -> ort.InferenceSession:
+    """Loads and initializes an onnx inference session for processing."""
+    return _load_model(self._model_uri)
+
+  def run_inference(
+      self,
+      batch: Sequence[numpy.ndarray],
+      inference_session: ort.InferenceSession,
+      inference_args: Optional[Dict[str, Any]] = None
+  ) -> Iterable[PredictionResult]:
+    """Runs inferences on a batch of numpy arrays.
+
+    Args:
+      batch: A sequence of examples as numpy arrays. They should
+        be single examples.
+      inference_session: An onnx inference session. Must be runnable with input x where x is sequence of numpy array
+      inference_args: Any additional arguments for an inference.
+
+    Returns:
+      An Iterable of type PredictionResult.
+    """
+    predictions = self._model_inference_fn(inference_session, batch, inference_args)[0]
+
+    return _convert_to_result(batch, predictions)
+
+  def get_num_bytes(self, batch: Sequence[numpy.ndarray]) -> int:
+    """
+    Returns:
+      The number of bytes of data for a batch.
+    """
+    return sum(sys.getsizeof(element) for element in batch)

Review Comment:
   Can we follow this for the num_bytes ?https://github.com/apache/beam/blob/95e53916b6c9de6052ce8fc8409a53bad3d9d517/sdks/python/apache_beam/ml/inference/tensorrt_inference.py#L307



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)

Review Comment:
   ```suggestion
     ort_outs = inference_session.run(None, ort_inputs)
   ```
   ```suggestion
     ort_outs = inference_session.run(None, {**ort_inputs, **inference_args})
   ```
   
   if the user required to pass any extra parameter other than input to the model predict call, they pass it through the `inference_args`. I think it's safe to merge `ort_inputs`, `inference_args`(even though it is empty)
   
   ```
   import onnxruntime as ort
   ort_sess = ort.InferenceSession('loop.onnx')
   outputs = ort_sess.run(None, {'input_data': dummy_input.numpy(),
                                 'loop_range': np.array(9).astype(np.int64)})
   
   loop_range could be an extra parameter passed to the predict call.
   ```



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)
+  return ort_outs
+
+
+class OnnxModelHandler(ModelHandler[numpy.ndarray,

Review Comment:
   ```suggestion
   class OnnxModelHandlerNumpy(ModelHandler[numpy.ndarray,
   ```



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)
+  return ort_outs
+
+
+class OnnxModelHandler(ModelHandler[numpy.ndarray,
+                                    PredictionResult,
+                                    ort.InferenceSession]):
+  def __init__(
+      self,
+      model_uri: str,
+      *,
+      inference_fn: NumpyInferenceFn = default_numpy_inference_fn):
+    """ Implementation of the ModelHandler interface for onnx
+    using numpy arrays as input.

Review Comment:
   Can we also add a note saying the inputs to ONNXModelHandler should be of the same sizes. If different size inputs are expected, user have to explicitly declare batch size as 1.



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):

Review Comment:
   can we move this to the `load_model()`, seems unnecessary in a different method



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+__all__ = [
+    'OnnxModelHandler'
+]
+
+NumpyInferenceFn = Callable[
+    [Sequence[numpy.ndarray], ort.InferenceSession, Optional[Dict[str, Any]]],
+    Iterable[PredictionResult]]
+
+
+def _load_model(model_uri):
+  ort_session = ort.InferenceSession(model_uri, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
+  return ort_session
+
+
+def _convert_to_result(
+    batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
+) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    return [
+        PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+    ]
+  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+
+def default_numpy_inference_fn(
+    inference_session: ort.InferenceSession,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  ort_inputs = {inference_session.get_inputs()[0].name: numpy.stack(batch, axis=0)}
+  ort_outs = inference_session.run(None, ort_inputs)
+  return ort_outs
+
+
+class OnnxModelHandler(ModelHandler[numpy.ndarray,
+                                    PredictionResult,
+                                    ort.InferenceSession]):
+  def __init__(
+      self,
+      model_uri: str,
+      *,
+      inference_fn: NumpyInferenceFn = default_numpy_inference_fn):
+    """ Implementation of the ModelHandler interface for onnx
+    using numpy arrays as input.
+
+    Example Usage::
+
+      pcoll | RunInference(OnnxModelHandler(model_uri="my_uri"))
+
+    Args:
+      model_uri: The URI to where the model is saved.
+      inference_fn: The inference function to use.

Review Comment:
   ```suggestion
         inference_fn: The inference function to use.
   ```
   ```suggestion
         inference_fn: The inference function to use on RunInference calls..
   ```



##########
sdks/python/apache_beam/ml/inference/onnx_inference.py:
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import sys
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import Union
+
+import numpy
+import pandas
+import onnx
+import onnxruntime as ort
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.utils.annotations import experimental
+
+try:

Review Comment:
   Can we remove unused dependencies imports?



##########
sdks/python/apache_beam/ml/inference/onnx_inference_test.py:
##########
@@ -0,0 +1,477 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import sys
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import onnx
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow import keras
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from apache_beam.ml.inference.base import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.onnx_inference import default_numpy_inference_fn
+  from apache_beam.ml.inference.onnx_inference import OnnxModelHandler
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+
+
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5
+                      for example in self.get_one_feature_samples()])
+    ]
+
+  def get_two_feature_examples(self):
+    return [
+      np.array([1, 5], dtype="float32"),
+      np.array([3, 10], dtype="float32"),
+      np.array([-14, 0], dtype="float32"),
+      np.array([0.5, 0.5], dtype="float32")
+    ]
+
+  def get_two_feature_predictions(self):
+    return [
+      PredictionResult(ex, pred) for ex,
+      pred in zip(
+        self.get_two_feature_examples(),
+        [f1 * 2.0 + f2 * 3 + 0.5
+        for f1, f2 in self.get_two_feature_examples()])
+        ]
+
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+  
+  def get_tf_one_feature_model(self):
+    params = [np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_one_feature_model(self):
+    x = [[0],[1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+      OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+      ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+
+  def get_sklearn_two_feature_model(self):
+    x = [[1,5],[3,2],[1,0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+    model.fit(x, y)
+    return model
+
+
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+
+def _to_numpy(tensor):
+      return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()
+
+class TestOnnxModelHandler(OnnxModelHandler):
+  def __init__(self,model_uri: str,*,inference_fn = default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._model_inference_fn = inference_fn
+
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+
+@pytest.mark.uses_pytorch

Review Comment:
   uses_pytorch markers runs Pytorch tests, which requires [https://github.com/apache/beam/blob/95e53916b6c9de6052ce8fc8409a53bad3d9d517/sdks/python/apache_beam/ml/inference/torch_tests_requirements.txt](requirements) file to be installed for dependencies. We either add onnx dependencies to that requirements file.
   
   Same goes with tensorflow. https://github.com/apache/beam/blob/95e53916b6c9de6052ce8fc8409a53bad3d9d517/sdks/python/apache_beam/examples/inference/tfx_bsl/requirements.txt



##########
start-build-env-onnx.sh:
##########
@@ -0,0 +1,146 @@
+#!/usr/bin/env bash

Review Comment:
   For my knowledge, why do we need this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ziqi-ma commented on pull request #24911: Ziqima/onnx

Posted by GitBox <gi...@apache.org>.
ziqi-ma commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1382878454

   > Hi, once the PR is ready for another review, comment on this with `PTAL @<username>`.
   > Thanks for the starting this.
   
   PTAL @AnandInguva 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on code in PR #24911:
URL: https://github.com/apache/beam/pull/24911#discussion_r1089072252


##########
sdks/python/tox.ini:
##########
@@ -326,3 +326,20 @@ commands =
   # Run all PyTorch unit tests
   # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories.
   /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'
+
+[testenv:py{37,38,39,310}-onnx]

Review Comment:
   You would need to add this to `sdks/python/test-suites/tox/py38/build.gradle` as well. 



##########
sdks/python/tox.ini:
##########
@@ -326,3 +326,20 @@ commands =
   # Run all PyTorch unit tests
   # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories.
   /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'
+
+[testenv:py{37,38,39,310}-onnx]

Review Comment:
   ```suggestion
   [testenv:py{37,38,39,310}-onnx-{113}]
   ```



##########
sdks/python/tox.ini:
##########
@@ -326,3 +326,20 @@ commands =
   # Run all PyTorch unit tests
   # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories.
   /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'
+
+[testenv:py{37,38,39,310}-onnx]
+deps =
+  onnxruntime>=1.13.1,<=1.13.1

Review Comment:
   Can we just pin `onnxruntime==1.13.1`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on pull request #24911: Ziqima/onnx

Posted by "jrmccluskey (via GitHub)" <gi...@apache.org>.
jrmccluskey commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1406650902

   Information on running formatting and linting locally is here: https://cwiki.apache.org/confluence/display/BEAM/Python+Tips#PythonTips-LintandFormattingChecks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1406639242

   cc: @damccorm @riteshghorse @jrmccluskey for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24911: Ziqima/onnx

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on PR #24911:
URL: https://github.com/apache/beam/pull/24911#issuecomment-1410614834

   > 
   
   Hi, I uploaded them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org