+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# pytype: skip-file
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+import numpy as np
+import pytest
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+  import onnxruntime as ort
+  import torch
+  from onnxruntime.capi.onnxruntime_pybind11_state import InvalidArgument
+  import tensorflow as tf
+  import tf2onnx
+  from tensorflow.keras import layers
+  from sklearn import linear_model
+  from skl2onnx import convert_sklearn
+  from skl2onnx.common.data_types import FloatTensorType
+  from import PredictionResult
+  from import RunInference
+  from import default_numpy_inference_fn
+  from import OnnxModelHandlerNumpy
+except ImportError:
+  raise unittest.SkipTest('Onnx dependencies are not installed')
+  from import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+  def generate(self, x):
+    out = self.linear(x) + 0.5
+    return out
+class TestDataAndModel():
+  def get_one_feature_samples(self):
+    return [
+        np.array([1], dtype="float32"),
+        np.array([5], dtype="float32"),
+        np.array([-3], dtype="float32"),
+        np.array([10.0], dtype="float32"),
+    ]
+  def get_one_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_one_feature_samples(),
+            [example * 2.0 + 0.5 for example in self.get_one_feature_samples()])
+    ]
+  def get_two_feature_examples(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([3, 10], dtype="float32"),
+        np.array([-14, 0], dtype="float32"),
+        np.array([0.5, 0.5], dtype="float32")
+    ]
+  def get_two_feature_predictions(self):
+    return [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            self.get_two_feature_examples(),
+            [
+                f1 * 2.0 + f2 * 3 + 0.5 for f1,
+                f2 in self.get_two_feature_examples()
+            ])
+    ]
+  def get_torch_one_feature_model(self):
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+  def get_tf_one_feature_model(self):
+    params = [
+        np.array([[2.0]], dtype="float32"), np.array([0.5], dtype="float32")
+    ]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+  def get_sklearn_one_feature_model(self):
+    x = [[0], [1]]
+    y = [0.5, 2.5]
+    model = linear_model.LinearRegression()
+, y)
+    return model
+  def get_torch_two_feature_model(self):
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    return model
+  def get_tf_two_feature_model(self):
+    params = [np.array([[2.0], [3]]), np.array([0.5], dtype="float32")]
+    linear_layer = layers.Dense(units=1, weights=params)
+    linear_model = tf.keras.Sequential([linear_layer])
+    return linear_model
+  def get_sklearn_two_feature_model(self):
+    x = [[1, 5], [3, 2], [1, 0]]
+    y = [17.5, 12.5, 2.5]
+    model = linear_model.LinearRegression()
+, y)
+    return model
+def _compare_prediction_result(a, b):
+  example_equal = np.array_equal(a.example, b.example)
+  if isinstance(a.inference, dict):
+    return all(
+        x == y for x, y in zip(a.inference.values(),
+                               b.inference.values())) and example_equal
+  return a.inference == b.inference and example_equal
+def _to_numpy(tensor):
+  return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu(
+  ).numpy()
+class TestOnnxModelHandler(OnnxModelHandlerNumpy):
+  def __init__( #pylint: disable=dangerous-default-value
+      self,
+      model_uri: str,
+      session_options=None,
+      providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
+      provider_options=None,
+      *,
+      inference_fn=default_numpy_inference_fn):
+    self._model_uri = model_uri
+    self._session_options = session_options
+    self._providers = providers
+    self._provider_options = provider_options
+    self._model_inference_fn = inference_fn
+class OnnxTestBase(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.test_data_and_model = TestDataAndModel()
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+class OnnxPytorchRunInferenceTest(OnnxTestBase):
+  def test_onnx_pytorch_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    model = self.test_data_and_model.get_torch_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+    dummy_input = torch.randn(4, 1, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant-
+                                                # folding for optimization
+                      input_names = ['input'],   # model's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+  def test_num_bytes(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    batched_examples_int = [
+        np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        batched_examples_int[0].itemsize * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+    batched_examples_float = [
+        np.array([1, 5], dtype=np.float32),
+        np.array([3, 10], dtype=np.float32),
+        np.array([-14, 0], dtype=np.float32),
+        np.array([0.5, 0.5], dtype=np.float32)
+    ]
+    self.assertEqual(
+        batched_examples_float[0].itemsize * 4,
+        inference_runner.get_num_bytes(batched_examples_float))
+  def test_namespace(self):
+    inference_runner = TestOnnxModelHandler("dummy")
+    self.assertEqual('BeamML_Onnx', inference_runner.get_metrics_namespace())
+class OnnxTensorflowRunInferenceTest(OnnxTestBase):
+  def test_onnx_tensorflow_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_tf_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_tf_path')
+    spec = (tf.TensorSpec((None, 1), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(linear_model,
+    input_signature=spec,
+    opset=13,
+    output_path=path)
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+class OnnxSklearnRunInferenceTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+  def test_onnx_sklearn_run_inference(self):
+    examples = self.test_data_and_model.get_one_feature_samples()
+    expected_predictions = self.test_data_and_model.get_one_feature_predictions(
+    )
+    linear_model = self.test_data_and_model.get_sklearn_one_feature_model()
+    path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+    self.save_model(linear_model, 1, path)
+    inference_runner = TestOnnxModelHandler(path)
+    inference_session = ort.InferenceSession(
+        path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
+    )  # this list specifies priority - prioritize gpu if cuda kernel exists
+    predictions = inference_runner.run_inference(examples, inference_session)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertEqual(actual, expected)
+class OnnxPytorchRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    dummy_input = torch.randn(4, 2, requires_grad=True)
+    torch.onnx.export(model,
+                      dummy_input, # model input
+                      path,   # where to save the model
+                      export_params=True, # store the trained parameter weights
+                      opset_version=10, # the ONNX version
+                      do_constant_folding=True, # whether to execute constant
+                                                # folding for optimization
+                      input_names = ['input'],   # odel's input names
+                      output_names = ['output'], # model's output names
+                      dynamic_axes={'input' : {0 : 'batch_size'},
+                                    'output' : {0 : 'batch_size'}})
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+      model = self.test_data_and_model.get_torch_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/torch_2xplus5_onnx'
+      # first need to download model from remote
+      model_handler = TestOnnxModelHandler(gs_path)
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_pytorch_path')
+        model = self.test_data_and_model.get_torch_two_feature_model()
+        self.exportModelToOnnx(model, path)
+        model_handler = TestOnnxModelHandler(path)
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+class OnnxTensorflowRunInferencePipelineTest(OnnxTestBase):
+  def exportModelToOnnx(self, model, path):
+    spec = (tf.TensorSpec((None, 2), tf.float32, name="input"), )
+    _, _ = tf2onnx.convert.from_keras(model,
+    input_signature=spec, opset=13, output_path=path)
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+      model = self.test_data_and_model.get_tf_two_feature_model()
+      self.exportModelToOnnx(model, path)
+      model_handler = TestOnnxModelHandler(path)
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model.get_one_feature_samples()
+      expected_predictions = self.test_data_and_model \
+                                 .get_one_feature_predictions()
+      gs_path = 'gs://apache-beam-ml/models/tf_2xplus5_onnx'
+      model_handler = TestOnnxModelHandler(gs_path)
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(InvalidArgument,
+                                "Got invalid dimensions for input"):
+      with TestPipeline() as pipeline:
+        examples = [np.array([1], dtype="float32")]
+        path = os.path.join(self.tmpdir, 'my_onnx_tensorflow_path')
+        model = self.test_data_and_model.get_tf_two_feature_model()
+        self.exportModelToOnnx(model, path)
+        model_handler = TestOnnxModelHandler(path)
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_handler)
+class OnnxSklearnRunInferencePipelineTest(OnnxTestBase):
+  def save_model(self, model, input_dim, path):
+    # assume float input
+    initial_type = [('float_input', FloatTensorType([None, input_dim]))]
+    onx = convert_sklearn(model, initial_types=initial_type)
+    with open(path, "wb") as f:
+      f.write(onx.SerializeToString())
+  def test_pipeline_local_model_simple(self):
+    with TestPipeline() as pipeline:
+      path = os.path.join(self.tmpdir, 'my_onnx_sklearn_path')
+      model = self.test_data_and_model.get_sklearn_two_feature_model()
+      self.save_model(model, 2, path)
+      model_handler = TestOnnxModelHandler(path)
+      pcoll = pipeline | 'start' >> beam.Create(
+          self.test_data_and_model.get_two_feature_examples())
+      predictions = pcoll | RunInference(model_handler)
+      assert_that(
+          predictions,
+          equal_to(
+              self.test_data_and_model.get_two_feature_predictions(),
+              equals_fn=_compare_prediction_result))
+  @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = self.test_data_and_model \

