You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/22 18:54:20 UTC
[beam] branch master updated: [BEAM-13983] Sklearn Loader for RunInference (#17368)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0362711d5a9 [BEAM-13983] Sklearn Loader for RunInference (#17368)
0362711d5a9 is described below
commit 0362711d5a91404795ddac38add9147f0f9b2c0a
Author: Ryan Thompson <ry...@gmail.com>
AuthorDate: Fri Apr 22 14:54:09 2022 -0400
[BEAM-13983] Sklearn Loader for RunInference (#17368)
* first commit
* added sklearn_loader
* added sklearn
* removed base remove
* added unit tests
* lint
* fixed exception test
* added more unit tests
* linted
* added joblib to test packages
* linted imports
* added sklearn to the test queue
* move scikitlearn
* fixed unit tests
* fixed asserts
* linted imports
* renamed SKLearn to Sklearn
* added unit test for bad serialization type
* use direct imports
* removed unused imports
* linted
* renamed serialization url
* fixed test with new name
* removed joblib as required dep
* used tmpfile close instead of flush, to see if that fixes windows permission issue
* removed skip if
* added more specific assert
* fixed unit tests
* changed to hopefully work with windows tests
* changed to file close
* remove windows version
* added platform import
* fixed import order
* skip temporary file opening on windows unit tests
* adding sklearn deps somehow effects the lint order
* ignore bad_input_type test
---
.../apache_beam/examples/complete/distribopt.py | 2 +-
.../apache_beam/ml/inference/sklearn_loader.py | 78 +++++++++
.../ml/inference/sklearn_loader_test.py | 176 +++++++++++++++++++++
.../apache_beam/tools/map_fn_microbenchmark.py | 3 +-
sdks/python/setup.py | 2 +
5 files changed, 259 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/examples/complete/distribopt.py b/sdks/python/apache_beam/examples/complete/distribopt.py
index fb797a05dda..89c312fcbf5 100644
--- a/sdks/python/apache_beam/examples/complete/distribopt.py
+++ b/sdks/python/apache_beam/examples/complete/distribopt.py
@@ -58,12 +58,12 @@ import uuid
from collections import defaultdict
import numpy as np
+from scipy.optimize import minimize
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
-from scipy.optimize import minimize
class Simulator(object):
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader.py b/sdks/python/apache_beam/ml/inference/sklearn_loader.py
new file mode 100644
index 00000000000..53adae0cbf4
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/sklearn_loader.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import enum
+import pickle
+import sys
+from typing import Any
+from typing import Iterable
+from typing import List
+
+import numpy
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+try:
+ import joblib
+except ImportError:
+ # joblib is an optional dependency.
+ pass
+
+
+class ModelFileType(enum.Enum):
+ PICKLE = 1
+ JOBLIB = 2
+
+
+class SklearnInferenceRunner(InferenceRunner):
+ def run_inference(self, batch: List[numpy.array],
+ model: Any) -> Iterable[numpy.array]:
+ # vectorize data for better performance
+ vectorized_batch = numpy.stack(batch, axis=0)
+ predictions = model.predict(vectorized_batch)
+ return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+ def get_num_bytes(self, batch: List[numpy.array]) -> int:
+ """Returns the number of bytes of data for a batch."""
+ return sum(sys.getsizeof(element) for element in batch)
+
+
+class SklearnModelLoader(ModelLoader):
+ def __init__(
+ self,
+ model_file_type: ModelFileType = ModelFileType.PICKLE,
+ model_uri: str = ''):
+ self._model_file_type = model_file_type
+ self._model_uri = model_uri
+ self._inference_runner = SklearnInferenceRunner()
+
+ def load_model(self):
+ """Loads and initializes a model for processing."""
+ file = FileSystems.open(self._model_uri, 'rb')
+ if self._model_file_type == ModelFileType.PICKLE:
+ return pickle.load(file)
+ elif self._model_file_type == ModelFileType.JOBLIB:
+ if not joblib:
+ raise ImportError('Joblib not available in SklearnModelLoader.')
+ return joblib.load(file)
+ raise TypeError('Unsupported serialization type.')
+
+ def get_inference_runner(self) -> SklearnInferenceRunner:
+ return self._inference_runner
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py
new file mode 100644
index 00000000000..6f7ed928de7
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import pickle
+import platform
+import shutil
+import sys
+import tempfile
+import unittest
+
+import joblib
+import numpy
+from sklearn import svm
+
+import apache_beam as beam
+import apache_beam.ml.inference.api as api
+import apache_beam.ml.inference.base as base
+from apache_beam.ml.inference.sklearn_loader import ModelFileType
+from apache_beam.ml.inference.sklearn_loader import SklearnInferenceRunner
+from apache_beam.ml.inference.sklearn_loader import SklearnModelLoader
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+def _compare_prediction_result(a, b):
+ example_equal = numpy.array_equal(a.example, b.example)
+ return a.inference == b.inference and example_equal
+
+
+class FakeModel:
+ def __init__(self):
+ self.total_predict_calls = 0
+
+ def predict(self, input_vector: numpy.array):
+ self.total_predict_calls += 1
+ return numpy.sum(input_vector, axis=1)
+
+
+def build_model():
+ x = [[0, 0], [1, 1]]
+ y = [0, 1]
+ model = svm.SVC()
+ model.fit(x, y)
+ return model
+
+
+class SkLearnRunInferenceTest(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_predict_output(self):
+ fake_model = FakeModel()
+ inference_runner = SklearnInferenceRunner()
+ batched_examples = [
+ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+ ]
+ expected_predictions = [
+ api.PredictionResult(numpy.array([1, 2, 3]), 6),
+ api.PredictionResult(numpy.array([4, 5, 6]), 15),
+ api.PredictionResult(numpy.array([7, 8, 9]), 24)
+ ]
+ inferences = inference_runner.run_inference(batched_examples, fake_model)
+ for actual, expected in zip(inferences, expected_predictions):
+ self.assertTrue(_compare_prediction_result(actual, expected))
+
+ def test_data_vectorized(self):
+ fake_model = FakeModel()
+ inference_runner = SklearnInferenceRunner()
+ batched_examples = [
+ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+ ]
+ # even though there are 3 examples, the data should
+ # be vectorized and only 1 call should happen.
+ inference_runner.run_inference(batched_examples, fake_model)
+ self.assertEqual(1, fake_model.total_predict_calls)
+
+ def test_num_bytes(self):
+ inference_runner = SklearnInferenceRunner()
+ batched_examples_int = [
+ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+ ]
+ self.assertEqual(
+ sys.getsizeof(batched_examples_int[0]) * 3,
+ inference_runner.get_num_bytes(batched_examples_int))
+
+ batched_examples_float = [
+ numpy.array([1.0, 2.0, 3.0]),
+ numpy.array([4.1, 5.2, 6.3]),
+ numpy.array([7.7, 8.8, 9.9])
+ ]
+ self.assertEqual(
+ sys.getsizeof(batched_examples_float[0]) * 3,
+ inference_runner.get_num_bytes(batched_examples_float))
+
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_pickled(self):
+ temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+ with open(temp_file_name, 'wb') as file:
+ pickle.dump(build_model(), file)
+ with TestPipeline() as pipeline:
+ examples = [numpy.array([0, 0]), numpy.array([1, 1])]
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ #TODO(BEAM-14305) Test against the public API.
+ actual = pcoll | base.RunInference(
+ SklearnModelLoader(model_uri=temp_file_name))
+ expected = [
+ api.PredictionResult(numpy.array([0, 0]), 0),
+ api.PredictionResult(numpy.array([1, 1]), 1)
+ ]
+ assert_that(
+ actual, equal_to(expected, equals_fn=_compare_prediction_result))
+
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_joblib(self):
+ temp_file_name = self.tmpdir + os.sep + 'joblib_file'
+ with open(temp_file_name, 'wb') as file:
+ joblib.dump(build_model(), file)
+ with TestPipeline() as pipeline:
+ examples = [numpy.array([0, 0]), numpy.array([1, 1])]
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ #TODO(BEAM-14305) Test against the public API.
+
+ actual = pcoll | base.RunInference(
+ SklearnModelLoader(
+ model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB))
+ expected = [
+ api.PredictionResult(numpy.array([0, 0]), 0),
+ api.PredictionResult(numpy.array([1, 1]), 1)
+ ]
+ assert_that(
+ actual, equal_to(expected, equals_fn=_compare_prediction_result))
+
+ def test_bad_file_raises(self):
+ with self.assertRaises(RuntimeError):
+ with TestPipeline() as pipeline:
+ examples = [numpy.array([0, 0])]
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ # TODO(BEAM-14305) Test against the public API.
+ _ = pcoll | base.RunInference(
+ SklearnModelLoader(model_uri='/var/bad_file_name'))
+ pipeline.run()
+
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_bad_input_type_raises(self):
+ with self.assertRaisesRegex(TypeError, 'Unsupported serialization type'):
+ with tempfile.NamedTemporaryFile() as file:
+ model_loader = SklearnModelLoader(
+ model_uri=file.name, model_file_type=None)
+ model_loader.load_model()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
index ddafbc147b5..cdbc5c4e6cb 100644
--- a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
@@ -35,9 +35,10 @@ Run as
import logging
import time
+from scipy import stats
+
import apache_beam as beam
from apache_beam.tools import utils
-from scipy import stats
def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 36ccd1d3a43..42a794120f6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -159,6 +159,7 @@ if sys.platform == 'win32' and sys.maxsize <= 2**32:
REQUIRED_TEST_PACKAGES = [
'freezegun>=0.3.12',
+ 'joblib>=1.1.0',
'mock>=1.0.1,<3.0.0',
'pandas<2.0.0',
'parameterized>=0.7.1,<0.8.0',
@@ -169,6 +170,7 @@ REQUIRED_TEST_PACKAGES = [
'pytest>=4.4.0,<5.0',
'pytest-xdist>=1.29.0,<2',
'pytest-timeout>=1.3.3,<2',
+ 'scikit-learn>=0.20.0',
'sqlalchemy>=1.3,<2.0',
'psycopg2-binary>=2.8.5,<3.0.0',
'testcontainers[mysql]>=3.0.3,<4.0.0',