You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/22 18:54:20 UTC

[beam] branch master updated: [BEAM-13983] Sklearn Loader for RunInference (#17368)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0362711d5a9 [BEAM-13983] Sklearn Loader for RunInference (#17368)
0362711d5a9 is described below

commit 0362711d5a91404795ddac38add9147f0f9b2c0a
Author: Ryan Thompson <ry...@gmail.com>
AuthorDate: Fri Apr 22 14:54:09 2022 -0400

    [BEAM-13983] Sklearn Loader for RunInference (#17368)
    
    * first commit
    
    * added sklearn_loader
    
    * added sklearn
    
    * removed base remove
    
    * added unit tests
    
    * lint
    
    * fixed exception test
    
    * added more unit tests
    
    * linted
    
    * added joblib to test packages
    
    * linted imports
    
    * added sklearn to the test queue
    
    * move scikitlearn
    
    * fixed unit tests
    
    * fixed asserts
    
    * linted imports
    
    * renamed SKLearn to Sklearn
    
    * added unit test for bad serialization type
    
    * use direct imports
    
    * removed unused imports
    
    * linted
    
    * renamed serialization url
    
    * fixed test with new name
    
    * removed joblib as required dep
    
    * used tmpfile close instead of flush, to see if that fixes windows permission issue
    
    * removed skip if
    
    * added more specific assert
    
    * fixed unit tests
    
    * changed to hopefully work with windows tests
    
    * changed to file close
    
    * remove windows version
    
    * added platform import
    
    * fixed import order
    
    * skip temporary file opening on windows unit tests
    
    * adding sklearn deps somehow effects the lint order
    
    * ignore bad_input_type test
---
 .../apache_beam/examples/complete/distribopt.py    |   2 +-
 .../apache_beam/ml/inference/sklearn_loader.py     |  78 +++++++++
 .../ml/inference/sklearn_loader_test.py            | 176 +++++++++++++++++++++
 .../apache_beam/tools/map_fn_microbenchmark.py     |   3 +-
 sdks/python/setup.py                               |   2 +
 5 files changed, 259 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/distribopt.py b/sdks/python/apache_beam/examples/complete/distribopt.py
index fb797a05dda..89c312fcbf5 100644
--- a/sdks/python/apache_beam/examples/complete/distribopt.py
+++ b/sdks/python/apache_beam/examples/complete/distribopt.py
@@ -58,12 +58,12 @@ import uuid
 from collections import defaultdict
 
 import numpy as np
+from scipy.optimize import minimize
 
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
-from scipy.optimize import minimize
 
 
 class Simulator(object):
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader.py b/sdks/python/apache_beam/ml/inference/sklearn_loader.py
new file mode 100644
index 00000000000..53adae0cbf4
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/sklearn_loader.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import enum
+import pickle
+import sys
+from typing import Any
+from typing import Iterable
+from typing import List
+
+import numpy
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+try:
+  import joblib
+except ImportError:
+  # joblib is an optional dependency.
+  pass
+
+
+class ModelFileType(enum.Enum):
+  PICKLE = 1
+  JOBLIB = 2
+
+
+class SklearnInferenceRunner(InferenceRunner):
+  def run_inference(self, batch: List[numpy.array],
+                    model: Any) -> Iterable[numpy.array]:
+    # vectorize data for better performance
+    vectorized_batch = numpy.stack(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
+
+  def get_num_bytes(self, batch: List[numpy.array]) -> int:
+    """Returns the number of bytes of data for a batch."""
+    return sum(sys.getsizeof(element) for element in batch)
+
+
+class SklearnModelLoader(ModelLoader):
+  def __init__(
+      self,
+      model_file_type: ModelFileType = ModelFileType.PICKLE,
+      model_uri: str = ''):
+    self._model_file_type = model_file_type
+    self._model_uri = model_uri
+    self._inference_runner = SklearnInferenceRunner()
+
+  def load_model(self):
+    """Loads and initializes a model for processing."""
+    file = FileSystems.open(self._model_uri, 'rb')
+    if self._model_file_type == ModelFileType.PICKLE:
+      return pickle.load(file)
+    elif self._model_file_type == ModelFileType.JOBLIB:
+      if not joblib:
+        raise ImportError('Joblib not available in SklearnModelLoader.')
+      return joblib.load(file)
+    raise TypeError('Unsupported serialization type.')
+
+  def get_inference_runner(self) -> SklearnInferenceRunner:
+    return self._inference_runner
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py
new file mode 100644
index 00000000000..6f7ed928de7
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import pickle
+import platform
+import shutil
+import sys
+import tempfile
+import unittest
+
+import joblib
+import numpy
+from sklearn import svm
+
+import apache_beam as beam
+import apache_beam.ml.inference.api as api
+import apache_beam.ml.inference.base as base
+from apache_beam.ml.inference.sklearn_loader import ModelFileType
+from apache_beam.ml.inference.sklearn_loader import SklearnInferenceRunner
+from apache_beam.ml.inference.sklearn_loader import SklearnModelLoader
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+def _compare_prediction_result(a, b):
+  example_equal = numpy.array_equal(a.example, b.example)
+  return a.inference == b.inference and example_equal
+
+
+class FakeModel:
+  def __init__(self):
+    self.total_predict_calls = 0
+
+  def predict(self, input_vector: numpy.array):
+    self.total_predict_calls += 1
+    return numpy.sum(input_vector, axis=1)
+
+
+def build_model():
+  x = [[0, 0], [1, 1]]
+  y = [0, 1]
+  model = svm.SVC()
+  model.fit(x, y)
+  return model
+
+
+class SkLearnRunInferenceTest(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_predict_output(self):
+    fake_model = FakeModel()
+    inference_runner = SklearnInferenceRunner()
+    batched_examples = [
+        numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+    ]
+    expected_predictions = [
+        api.PredictionResult(numpy.array([1, 2, 3]), 6),
+        api.PredictionResult(numpy.array([4, 5, 6]), 15),
+        api.PredictionResult(numpy.array([7, 8, 9]), 24)
+    ]
+    inferences = inference_runner.run_inference(batched_examples, fake_model)
+    for actual, expected in zip(inferences, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_data_vectorized(self):
+    fake_model = FakeModel()
+    inference_runner = SklearnInferenceRunner()
+    batched_examples = [
+        numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+    ]
+    # even though there are 3 examples, the data should
+    # be vectorized and only 1 call should happen.
+    inference_runner.run_inference(batched_examples, fake_model)
+    self.assertEqual(1, fake_model.total_predict_calls)
+
+  def test_num_bytes(self):
+    inference_runner = SklearnInferenceRunner()
+    batched_examples_int = [
+        numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
+    ]
+    self.assertEqual(
+        sys.getsizeof(batched_examples_int[0]) * 3,
+        inference_runner.get_num_bytes(batched_examples_int))
+
+    batched_examples_float = [
+        numpy.array([1.0, 2.0, 3.0]),
+        numpy.array([4.1, 5.2, 6.3]),
+        numpy.array([7.7, 8.8, 9.9])
+    ]
+    self.assertEqual(
+        sys.getsizeof(batched_examples_float[0]) * 3,
+        inference_runner.get_num_bytes(batched_examples_float))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pickled(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_model(), file)
+    with TestPipeline() as pipeline:
+      examples = [numpy.array([0, 0]), numpy.array([1, 1])]
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      #TODO(BEAM-14305) Test against the public API.
+      actual = pcoll | base.RunInference(
+          SklearnModelLoader(model_uri=temp_file_name))
+      expected = [
+          api.PredictionResult(numpy.array([0, 0]), 0),
+          api.PredictionResult(numpy.array([1, 1]), 1)
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_prediction_result))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_joblib(self):
+    temp_file_name = self.tmpdir + os.sep + 'joblib_file'
+    with open(temp_file_name, 'wb') as file:
+      joblib.dump(build_model(), file)
+    with TestPipeline() as pipeline:
+      examples = [numpy.array([0, 0]), numpy.array([1, 1])]
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      #TODO(BEAM-14305) Test against the public API.
+
+      actual = pcoll | base.RunInference(
+          SklearnModelLoader(
+              model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB))
+      expected = [
+          api.PredictionResult(numpy.array([0, 0]), 0),
+          api.PredictionResult(numpy.array([1, 1]), 1)
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_prediction_result))
+
+  def test_bad_file_raises(self):
+    with self.assertRaises(RuntimeError):
+      with TestPipeline() as pipeline:
+        examples = [numpy.array([0, 0])]
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        # TODO(BEAM-14305) Test against the public API.
+        _ = pcoll | base.RunInference(
+            SklearnModelLoader(model_uri='/var/bad_file_name'))
+        pipeline.run()
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_bad_input_type_raises(self):
+    with self.assertRaisesRegex(TypeError, 'Unsupported serialization type'):
+      with tempfile.NamedTemporaryFile() as file:
+        model_loader = SklearnModelLoader(
+            model_uri=file.name, model_file_type=None)
+        model_loader.load_model()
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
index ddafbc147b5..cdbc5c4e6cb 100644
--- a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py
@@ -35,9 +35,10 @@ Run as
 import logging
 import time
 
+from scipy import stats
+
 import apache_beam as beam
 from apache_beam.tools import utils
-from scipy import stats
 
 
 def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 36ccd1d3a43..42a794120f6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -159,6 +159,7 @@ if sys.platform == 'win32' and sys.maxsize <= 2**32:
 
 REQUIRED_TEST_PACKAGES = [
     'freezegun>=0.3.12',
+    'joblib>=1.1.0',
     'mock>=1.0.1,<3.0.0',
     'pandas<2.0.0',
     'parameterized>=0.7.1,<0.8.0',
@@ -169,6 +170,7 @@ REQUIRED_TEST_PACKAGES = [
     'pytest>=4.4.0,<5.0',
     'pytest-xdist>=1.29.0,<2',
     'pytest-timeout>=1.3.3,<2',
+    'scikit-learn>=0.20.0',
     'sqlalchemy>=1.3,<2.0',
     'psycopg2-binary>=2.8.5,<3.0.0',
     'testcontainers[mysql]>=3.0.3,<4.0.0',