You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/21 16:45:26 UTC

[beam] branch master updated: [BEAM-13984] Implement RunInference for PyTorch (#17196)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 59542096cb5 [BEAM-13984] Implement RunInference for PyTorch (#17196)
59542096cb5 is described below

commit 59542096cb58dd352dd6d15928ddae3563d8770a
Author: Andy Ye <an...@gmail.com>
AuthorDate: Thu Apr 21 11:45:16 2022 -0500

    [BEAM-13984] Implement RunInference for PyTorch (#17196)
    
    * Initial pytorch implementation
    
    * Clean up pytorch implementation; Works for single example
    
    * Fix for multiple examples in a batch
    
    * Fix header and documentation
    
    * Add multifeature tests; Add numpy/tensor conversion
    
    * Add torch to setup.py
    
    * Add ml to tox.ini
    
    * Remove numpy checks/conversions; Address PR comments
    
    * Remove GPU code and test
    
    * Add Filesystems
    
    * Add separate pytorch install and tox test
    
    * Fix typos in gradle and tox files
    
    * Add separate tox tests for pytorch; Remove torch setup install
    
    * fix import error
    
    * Add unittest main()
    
    * Add PredictionResult; Refactor tests
    
    * Fix docs; Remove keyed test
    
    * Add gcp to tox
---
 sdks/python/apache_beam/ml/inference/pytorch.py    | 101 ++++++++++
 .../apache_beam/ml/inference/pytorch_test.py       | 209 +++++++++++++++++++++
 .../container/license_scripts/dep_urls_py.yaml     |   2 +
 sdks/python/pytest.ini                             |   1 +
 sdks/python/test-suites/tox/py38/build.gradle      |   9 +
 sdks/python/tox.ini                                |  12 ++
 6 files changed, 334 insertions(+)

diff --git a/sdks/python/apache_beam/ml/inference/pytorch.py b/sdks/python/apache_beam/ml/inference/pytorch.py
new file mode 100644
index 00000000000..62741a899e8
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/pytorch.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  This class runs Pytorch inferences with the run_inference method. It also has
+  other methods to get the bytes of a batch of Tensors as well as the namespace
+  for Pytorch models.
+  """
+  def __init__(self, device: torch.device):
+    self._device = device
+
+  def run_inference(self, batch: List[torch.Tensor],
+                    model: torch.nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of Tensors and returns an Iterable of
+    Tensor Predictions.
+
+    This method stacks the list of Tensors in a vectorized format to optimize
+    the inference call.
+    """
+
+    batch = torch.stack(batch)
+    if batch.device != self._device:
+      batch = batch.to(self._device)
+    predictions = model(batch)
+    return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+  def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+    """Returns the number of bytes of data for a batch of Tensors."""
+    return sum((el.element_size() for tensor in batch for el in tensor))
+
+  def get_metrics_namespace(self) -> str:
+    """
+    Returns a namespace for metrics collected by the RunInference transform.
+    """
+    return 'RunInferencePytorch'
+
+
+class PytorchModelLoader(ModelLoader):
+  """Loads a Pytorch Model."""
+  def __init__(
+      self,
+      state_dict_path: str,
+      model_class: torch.nn.Module,
+      device: str = 'CPU'):
+    """
+    state_dict_path: path to the saved dictionary of the model state.
+    model_class: class of the Pytorch model that defines the model structure.
+    device: the device on which you wish to run the model. If ``device = GPU``
+        then device will be cuda if it is avaiable. Otherwise, it will be cpu.
+
+    See https://pytorch.org/tutorials/beginner/saving_loading_models.html
+    for details
+    """
+    self._state_dict_path = state_dict_path
+    if device == 'GPU' and torch.cuda.is_available():
+      self._device = torch.device('cuda')
+    else:
+      self._device = torch.device('cpu')
+    self._model_class = model_class
+    self._model_class.to(self._device)
+    self._inference_runner = PytorchInferenceRunner(device=self._device)
+
+  def load_model(self) -> torch.nn.Module:
+    """Loads and initializes a Pytorch model for processing."""
+    model = self._model_class
+    file = FileSystems.open(self._state_dict_path, 'rb')
+    model.load_state_dict(torch.load(file))
+    model.eval()
+    return model
+
+  def get_inference_runner(self) -> InferenceRunner:
+    """Returns a Pytorch implementation of InferenceRunner."""
+    return self._inference_runner
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_test.py b/sdks/python/apache_beam/ml/inference/pytorch_test.py
new file mode 100644
index 00000000000..9a8d3871c09
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/pytorch_test.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import torch
+  from apache_beam.ml.inference.api import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.pytorch import PytorchInferenceRunner
+  from apache_beam.ml.inference.pytorch import PytorchModelLoader
+except ImportError:
+  raise unittest.SkipTest('PyTorch dependencies are not installed')
+
+
+def _compare_prediction_result(a, b):
+  return (
+      torch.equal(a.inference, b.inference) and
+      torch.equal(a.example, b.example))
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+
+@pytest.mark.uses_pytorch
+class PytorchRunInferenceTest(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_inference_runner_single_tensor_feature(self):
+    examples = [
+        torch.from_numpy(np.array([1], dtype="float32")),
+        torch.from_numpy(np.array([5], dtype="float32")),
+        torch.from_numpy(np.array([-3], dtype="float32")),
+        torch.from_numpy(np.array([10.0], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([example * 2.0 + 0.5
+                          for example in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_inference_runner_multiple_tensor_features(self):
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    examples = [
+        torch.from_numpy(np.array([1, 5], dtype="float32")),
+        torch.from_numpy(np.array([3, 10], dtype="float32")),
+        torch.from_numpy(np.array([-14, 0], dtype="float32")),
+        torch.from_numpy(np.array([0.5, 0.5], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                          for f1, f2 in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_num_bytes(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    self.assertEqual((examples[0].element_size()) * 8,
+                     inference_runner.get_num_bytes(examples))
+
+  def test_namespace(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    self.assertEqual(
+        'RunInferencePytorch', inference_runner.get_metrics_namespace())
+
+  def test_pipeline_local_model(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                   dtype="float32")).reshape(-1, 2)
+      expected_predictions = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                            for f1, f2 in examples]).reshape(-1, 1))
+      ]
+
+      state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                                ('linear.bias', torch.Tensor([0.5]))])
+      path = os.path.join(self.tmpdir, 'my_state_dict_path')
+      torch.save(state_dict, path)
+
+      model_loader = PytorchModelLoader(
+          state_dict_path=path,
+          model_class=PytorchLinearRegression(input_dim=2, output_dim=1))
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_loader)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+      expected_predictions = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([example * 2.0 + 0.5
+                            for example in examples]).reshape(-1, 1))
+      ]
+
+      gs_pth = 'gs://apache-beam-ml/pytorch_lin_reg_model_2x+0.5_state_dict.pth'
+      model_loader = PytorchModelLoader(
+          state_dict_path=gs_pth,
+          model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_loader)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_invalid_input_type(self):
+    with self.assertRaisesRegex(TypeError, "expected Tensor as element"):
+      with TestPipeline() as pipeline:
+        examples = np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1)
+
+        state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                                  ('linear.bias', torch.Tensor([0.5]))])
+        path = os.path.join(self.tmpdir, 'my_state_dict_path')
+        torch.save(state_dict, path)
+
+        model_loader = PytorchModelLoader(
+            state_dict_path=path,
+            model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # pylint: disable=expression-not-assigned
+        pcoll | RunInference(model_loader)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index f1df6f5ea07..9d8b4201656 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -145,5 +145,7 @@ pip_dependencies:
     license: "https://raw.githubusercontent.com/PAIR-code/what-if-tool/master/LICENSE"
   timeloop:
     license: "https://raw.githubusercontent.com/sankalpjonn/timeloop/master/LICENSE"
+  torch:
+    license: "https://raw.githubusercontent.com/pytorch/pytorch/master/LICENSE"
   wget:
     license: "https://raw.githubusercontent.com/mirror/wget/master/COPYING"
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 4a52bce2090..f24c9a9352e 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -45,6 +45,7 @@ markers =
     no_xdist: run without pytest-xdist plugin
     # We run these tests with multiple major pyarrow versions (BEAM-11211)
     uses_pyarrow: tests that utilize pyarrow in some way
+    uses_pytorch: tests that utilize pytorch in some way
 
 # Default timeout intended for unit tests.
 # If certain tests need a different value, please see the docs on how to
diff --git a/sdks/python/test-suites/tox/py38/build.gradle b/sdks/python/test-suites/tox/py38/build.gradle
index 4992c8750bf..2a5ac1482ff 100644
--- a/sdks/python/test-suites/tox/py38/build.gradle
+++ b/sdks/python/test-suites/tox/py38/build.gradle
@@ -85,6 +85,15 @@ toxTask "testPy38pandas-14", "py38-pandas-14"
 test.dependsOn "testPy38pandas-14"
 preCommitPy38.dependsOn "testPy38pandas-14"
 
+// Create a test task for each minor version of pytorch
+toxTask "testPy38pytorch-19", "py38-pytorch-19"
+test.dependsOn "testPy38pytorch-19"
+preCommitPy38.dependsOn "testPy38pytorch-19"
+
+toxTask "testPy38pytorch-110", "py38-pytorch-110"
+test.dependsOn "testPy38pytorch-110"
+preCommitPy38.dependsOn "testPy38pytorch-110"
+
 toxTask "whitespacelint", "whitespacelint"
 
 task archiveFilesToLint(type: Zip) {
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 8db1162d151..858db8590a8 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -264,3 +264,15 @@ commands =
   /bin/sh -c "pip freeze | grep -E '(pandas|numpy)'"
   # Run all DataFrame API unit tests
   {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'
+
+[testenv:py{37,38,39}-pytorch-{19,110}]
+deps =
+  -r build-requirements.txt
+  19: torch>=1.9.0,<1.10.0
+  110: torch>=1.10.0,<1.11.0
+extras = test,gcp
+commands =
+  # Log torch version for debugging
+  /bin/sh -c "pip freeze | grep -E torch"
+  # Run all PyTorch unit tests
+  pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}