You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/21 16:45:26 UTC
[beam] branch master updated: [BEAM-13984] Implement RunInference for PyTorch (#17196)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 59542096cb5 [BEAM-13984] Implement RunInference for PyTorch (#17196)
59542096cb5 is described below
commit 59542096cb58dd352dd6d15928ddae3563d8770a
Author: Andy Ye <an...@gmail.com>
AuthorDate: Thu Apr 21 11:45:16 2022 -0500
[BEAM-13984] Implement RunInference for PyTorch (#17196)
* Initial pytorch implementation
* Clean up pytorch implementation; Works for single example
* Fix for multiple examples in a batch
* Fix header and documentation
* Add multifeature tests; Add numpy/tensor conversion
* Add torch to setup.py
* Add ml to tox.ini
* Remove numpy checks/conversions; Address PR comments
* Remove GPU code and test
* Add Filesystems
* Add separate pytorch install and tox test
* Fix typos in gradle and tox files
* Add separate tox tests for pytorch; Remove torch setup install
* fix import error
* Add unittest main()
* Add PredictionResult; Refactor tests
* Fix docs; Remove keyed test
* Add gcp to tox
---
sdks/python/apache_beam/ml/inference/pytorch.py | 101 ++++++++++
.../apache_beam/ml/inference/pytorch_test.py | 209 +++++++++++++++++++++
.../container/license_scripts/dep_urls_py.yaml | 2 +
sdks/python/pytest.ini | 1 +
sdks/python/test-suites/tox/py38/build.gradle | 9 +
sdks/python/tox.ini | 12 ++
6 files changed, 334 insertions(+)
diff --git a/sdks/python/apache_beam/ml/inference/pytorch.py b/sdks/python/apache_beam/ml/inference/pytorch.py
new file mode 100644
index 00000000000..62741a899e8
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/pytorch.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+ """
+ This class runs Pytorch inferences with the run_inference method. It also has
+ other methods to get the bytes of a batch of Tensors as well as the namespace
+ for Pytorch models.
+ """
+ def __init__(self, device: torch.device):
+ self._device = device
+
+ def run_inference(self, batch: List[torch.Tensor],
+ model: torch.nn.Module) -> Iterable[torch.Tensor]:
+ """
+ Runs inferences on a batch of Tensors and returns an Iterable of
+ Tensor Predictions.
+
+ This method stacks the list of Tensors in a vectorized format to optimize
+ the inference call.
+ """
+
+ batch = torch.stack(batch)
+ if batch.device != self._device:
+ batch = batch.to(self._device)
+ predictions = model(batch)
+ return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+ def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+ """Returns the number of bytes of data for a batch of Tensors."""
+ return sum((el.element_size() for tensor in batch for el in tensor))
+
+ def get_metrics_namespace(self) -> str:
+ """
+ Returns a namespace for metrics collected by the RunInference transform.
+ """
+ return 'RunInferencePytorch'
+
+
+class PytorchModelLoader(ModelLoader):
+ """Loads a Pytorch Model."""
+ def __init__(
+ self,
+ state_dict_path: str,
+ model_class: torch.nn.Module,
+ device: str = 'CPU'):
+ """
+ state_dict_path: path to the saved dictionary of the model state.
+ model_class: class of the Pytorch model that defines the model structure.
+ device: the device on which you wish to run the model. If ``device = GPU``
+ then device will be cuda if it is avaiable. Otherwise, it will be cpu.
+
+ See https://pytorch.org/tutorials/beginner/saving_loading_models.html
+ for details
+ """
+ self._state_dict_path = state_dict_path
+ if device == 'GPU' and torch.cuda.is_available():
+ self._device = torch.device('cuda')
+ else:
+ self._device = torch.device('cpu')
+ self._model_class = model_class
+ self._model_class.to(self._device)
+ self._inference_runner = PytorchInferenceRunner(device=self._device)
+
+ def load_model(self) -> torch.nn.Module:
+ """Loads and initializes a Pytorch model for processing."""
+ model = self._model_class
+ file = FileSystems.open(self._state_dict_path, 'rb')
+ model.load_state_dict(torch.load(file))
+ model.eval()
+ return model
+
+ def get_inference_runner(self) -> InferenceRunner:
+ """Returns a Pytorch implementation of InferenceRunner."""
+ return self._inference_runner
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_test.py b/sdks/python/apache_beam/ml/inference/pytorch_test.py
new file mode 100644
index 00000000000..9a8d3871c09
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/pytorch_test.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+ import torch
+ from apache_beam.ml.inference.api import PredictionResult
+ from apache_beam.ml.inference.base import RunInference
+ from apache_beam.ml.inference.pytorch import PytorchInferenceRunner
+ from apache_beam.ml.inference.pytorch import PytorchModelLoader
+except ImportError:
+ raise unittest.SkipTest('PyTorch dependencies are not installed')
+
+
+def _compare_prediction_result(a, b):
+ return (
+ torch.equal(a.inference, b.inference) and
+ torch.equal(a.example, b.example))
+
+
+class PytorchLinearRegression(torch.nn.Module):
+ def __init__(self, input_dim, output_dim):
+ super().__init__()
+ self.linear = torch.nn.Linear(input_dim, output_dim)
+
+ def forward(self, x):
+ out = self.linear(x)
+ return out
+
+
+@pytest.mark.uses_pytorch
+class PytorchRunInferenceTest(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_inference_runner_single_tensor_feature(self):
+ examples = [
+ torch.from_numpy(np.array([1], dtype="float32")),
+ torch.from_numpy(np.array([5], dtype="float32")),
+ torch.from_numpy(np.array([-3], dtype="float32")),
+ torch.from_numpy(np.array([10.0], dtype="float32")),
+ ]
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([example * 2.0 + 0.5
+ for example in examples]).reshape(-1, 1))
+ ]
+
+ model = PytorchLinearRegression(input_dim=1, output_dim=1)
+ model.load_state_dict(
+ OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+ ('linear.bias', torch.Tensor([0.5]))]))
+ model.eval()
+
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ predictions = inference_runner.run_inference(examples, model)
+ for actual, expected in zip(predictions, expected_predictions):
+ self.assertTrue(_compare_prediction_result(actual, expected))
+
+ def test_inference_runner_multiple_tensor_features(self):
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ examples = [
+ torch.from_numpy(np.array([1, 5], dtype="float32")),
+ torch.from_numpy(np.array([3, 10], dtype="float32")),
+ torch.from_numpy(np.array([-14, 0], dtype="float32")),
+ torch.from_numpy(np.array([0.5, 0.5], dtype="float32")),
+ ]
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+ for f1, f2 in examples]).reshape(-1, 1))
+ ]
+
+ model = PytorchLinearRegression(input_dim=2, output_dim=1)
+ model.load_state_dict(
+ OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+ ('linear.bias', torch.Tensor([0.5]))]))
+ model.eval()
+
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ predictions = inference_runner.run_inference(examples, model)
+ for actual, expected in zip(predictions, expected_predictions):
+ self.assertTrue(_compare_prediction_result(actual, expected))
+
+ def test_num_bytes(self):
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ self.assertEqual((examples[0].element_size()) * 8,
+ inference_runner.get_num_bytes(examples))
+
+ def test_namespace(self):
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ self.assertEqual(
+ 'RunInferencePytorch', inference_runner.get_metrics_namespace())
+
+ def test_pipeline_local_model(self):
+ with TestPipeline() as pipeline:
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+ for f1, f2 in examples]).reshape(-1, 1))
+ ]
+
+ state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+ ('linear.bias', torch.Tensor([0.5]))])
+ path = os.path.join(self.tmpdir, 'my_state_dict_path')
+ torch.save(state_dict, path)
+
+ model_loader = PytorchModelLoader(
+ state_dict_path=path,
+ model_class=PytorchLinearRegression(input_dim=2, output_dim=1))
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ predictions = pcoll | RunInference(model_loader)
+ assert_that(
+ predictions,
+ equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+ def test_pipeline_gcs_model(self):
+ with TestPipeline() as pipeline:
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([example * 2.0 + 0.5
+ for example in examples]).reshape(-1, 1))
+ ]
+
+ gs_pth = 'gs://apache-beam-ml/pytorch_lin_reg_model_2x+0.5_state_dict.pth'
+ model_loader = PytorchModelLoader(
+ state_dict_path=gs_pth,
+ model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ predictions = pcoll | RunInference(model_loader)
+ assert_that(
+ predictions,
+ equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+ def test_invalid_input_type(self):
+ with self.assertRaisesRegex(TypeError, "expected Tensor as element"):
+ with TestPipeline() as pipeline:
+ examples = np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1)
+
+ state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+ ('linear.bias', torch.Tensor([0.5]))])
+ path = os.path.join(self.tmpdir, 'my_state_dict_path')
+ torch.save(state_dict, path)
+
+ model_loader = PytorchModelLoader(
+ state_dict_path=path,
+ model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ # pylint: disable=expression-not-assigned
+ pcoll | RunInference(model_loader)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index f1df6f5ea07..9d8b4201656 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -145,5 +145,7 @@ pip_dependencies:
license: "https://raw.githubusercontent.com/PAIR-code/what-if-tool/master/LICENSE"
timeloop:
license: "https://raw.githubusercontent.com/sankalpjonn/timeloop/master/LICENSE"
+ torch:
+ license: "https://raw.githubusercontent.com/pytorch/pytorch/master/LICENSE"
wget:
license: "https://raw.githubusercontent.com/mirror/wget/master/COPYING"
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 4a52bce2090..f24c9a9352e 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -45,6 +45,7 @@ markers =
no_xdist: run without pytest-xdist plugin
# We run these tests with multiple major pyarrow versions (BEAM-11211)
uses_pyarrow: tests that utilize pyarrow in some way
+ uses_pytorch: tests that utilize pytorch in some way
# Default timeout intended for unit tests.
# If certain tests need a different value, please see the docs on how to
diff --git a/sdks/python/test-suites/tox/py38/build.gradle b/sdks/python/test-suites/tox/py38/build.gradle
index 4992c8750bf..2a5ac1482ff 100644
--- a/sdks/python/test-suites/tox/py38/build.gradle
+++ b/sdks/python/test-suites/tox/py38/build.gradle
@@ -85,6 +85,15 @@ toxTask "testPy38pandas-14", "py38-pandas-14"
test.dependsOn "testPy38pandas-14"
preCommitPy38.dependsOn "testPy38pandas-14"
+// Create a test task for each minor version of pytorch
+toxTask "testPy38pytorch-19", "py38-pytorch-19"
+test.dependsOn "testPy38pytorch-19"
+preCommitPy38.dependsOn "testPy38pytorch-19"
+
+toxTask "testPy38pytorch-110", "py38-pytorch-110"
+test.dependsOn "testPy38pytorch-110"
+preCommitPy38.dependsOn "testPy38pytorch-110"
+
toxTask "whitespacelint", "whitespacelint"
task archiveFilesToLint(type: Zip) {
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 8db1162d151..858db8590a8 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -264,3 +264,15 @@ commands =
/bin/sh -c "pip freeze | grep -E '(pandas|numpy)'"
# Run all DataFrame API unit tests
{toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'
+
+[testenv:py{37,38,39}-pytorch-{19,110}]
+deps =
+ -r build-requirements.txt
+ 19: torch>=1.9.0,<1.10.0
+ 110: torch>=1.10.0,<1.11.0
+extras = test,gcp
+commands =
+ # Log torch version for debugging
+ /bin/sh -c "pip freeze | grep -E torch"
+ # Run all PyTorch unit tests
+ pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pytorch {posargs}