You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/09/15 13:13:58 UTC

[beam] branch master updated: Pass namespace through RunInference transform (#23182)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b31ebb033 Pass namespace through RunInference transform (#23182)
f7b31ebb033 is described below

commit f7b31ebb03378dbb0254bd6f8022b0f6272b1e60
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Thu Sep 15 09:13:47 2022 -0400

    Pass namespace through RunInference transform (#23182)
    
    * Pass namespace through RunInference transform
    
    * Refactor namespace to metrics_namespace
    
    * Update doc string
    Testing an example with a custom namespace
    
    * Add a stage name to the base RunInferene DoFn
    
    * Unit test for custom namespace
    Revert "Testing an example with a custom namespace"
    
    This reverts commit 39163fdc311ba87403754a453efad008e48fbf77.
    Update sdks/python/apache_beam/ml/inference/base_test.py
    
    * Refactor RunInference default namespace
    
    * Fixup unit test
    
    * Refactor ModelHandler default namespace for consistency
---
 sdks/python/apache_beam/ml/inference/base.py       | 23 ++++++++++++++++------
 sdks/python/apache_beam/ml/inference/base_test.py  | 19 ++++++++++++++++++
 .../apache_beam/ml/inference/pytorch_inference.py  |  4 ++--
 .../ml/inference/pytorch_inference_test.py         |  3 +--
 .../apache_beam/ml/inference/sklearn_inference.py  | 14 +++++++++++++
 .../pytorch_image_classification_benchmarks.py     |  2 +-
 .../pytorch_language_modeling_benchmarks.py        |  2 +-
 7 files changed, 55 insertions(+), 12 deletions(-)

diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py
index 075260ec3ec..8b88809d329 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -272,7 +272,8 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
       self,
       model_handler: ModelHandler[ExampleT, PredictionT, Any],
       clock=time,
-      inference_args: Optional[Dict[str, Any]] = None):
+      inference_args: Optional[Dict[str, Any]] = None,
+      metrics_namespace: Optional[str] = None):
     """A transform that takes a PCollection of examples (or features) to be used
     on an ML model. It will then output inferences (or predictions) for those
     examples in a PCollection of PredictionResults, containing the input
@@ -289,10 +290,12 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
         clock: A clock implementing time_ns. *Used for unit testing.*
         inference_args: Extra arguments for models whose inference call requires
           extra parameters.
+        metrics_namespace: Namespace of the transform to collect metrics.
     """
     self._model_handler = model_handler
     self._inference_args = inference_args
     self._clock = clock
+    self._metrics_namespace = metrics_namespace
 
   # TODO(BEAM-14046): Add and link to help documentation.
   @classmethod
@@ -321,9 +324,10 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
         # TODO(https://github.com/apache/beam/issues/21440): Hook into the
         # batching DoFn APIs.
         | beam.BatchElements(**self._model_handler.batch_elements_kwargs())
-        | (
+        | 'BeamML_RunInference' >> (
             beam.ParDo(
-                _RunInferenceDoFn(self._model_handler, self._clock),
+                _RunInferenceDoFn(
+                    self._model_handler, self._clock, self._metrics_namespace),
                 self._inference_args).with_resource_hints(**resource_hints)))
 
 
@@ -378,17 +382,22 @@ class _MetricsCollector:
 
 class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
   def __init__(
-      self, model_handler: ModelHandler[ExampleT, PredictionT, Any], clock):
+      self,
+      model_handler: ModelHandler[ExampleT, PredictionT, Any],
+      clock,
+      metrics_namespace):
     """A DoFn implementation generic to frameworks.
 
       Args:
         model_handler: An implementation of ModelHandler.
         clock: A clock implementing time_ns. *Used for unit testing.*
+        metrics_namespace: Namespace of the transform to collect metrics.
     """
     self._model_handler = model_handler
     self._shared_model_handle = shared.Shared()
     self._clock = clock
     self._model = None
+    self._metrics_namespace = metrics_namespace
 
   def _load_model(self):
     def load():
@@ -409,8 +418,10 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
     return self._shared_model_handle.acquire(load)
 
   def setup(self):
-    self._metrics_collector = _MetricsCollector(
-        self._model_handler.get_metrics_namespace())
+    metrics_namespace = (
+        self._metrics_namespace) if self._metrics_namespace else (
+            self._model_handler.get_metrics_namespace())
+    self._metrics_collector = _MetricsCollector(metrics_namespace)
     self._model = self._load_model()
 
   def process(self, batch, inference_args):
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py
index ca79a3cd3a3..278485666a0 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -142,6 +142,25 @@ class RunInferenceBaseTest(unittest.TestCase):
           inference_args=inference_args)
       assert_that(actual, equal_to(examples), label='assert:inferences')
 
+  def test_run_inference_metrics_with_custom_namespace(self):
+    metrics_namespace = 'my_custom_namespace'
+    pipeline = TestPipeline()
+    examples = [1, 5, 3, 10]
+    pcoll = pipeline | 'start' >> beam.Create(examples)
+    _ = pcoll | base.RunInference(
+        FakeModelHandler(), metrics_namespace=metrics_namespace)
+    result = pipeline.run()
+    result.wait_until_finish()
+
+    metrics_filter = MetricsFilter().with_namespace(namespace=metrics_namespace)
+    metrics = result.metrics().query(metrics_filter)
+    assert len(metrics['counters']) != 0
+    assert len(metrics['distributions']) != 0
+
+    metrics_filter = MetricsFilter().with_namespace(namespace='fake_namespace')
+    metrics = result.metrics().query(metrics_filter)
+    assert len(metrics['counters']) == len(metrics['distributions']) == 0
+
   def test_unexpected_inference_args_passed(self):
     with self.assertRaisesRegex(ValueError, r'inference_args were provided'):
       with TestPipeline() as pipeline:
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
index c801e193daf..d97205937e2 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
@@ -196,7 +196,7 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
     Returns:
        A namespace for metrics collected by the RunInference transform.
     """
-    return 'RunInferencePytorch'
+    return 'BeamML_PyTorch'
 
   def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]):
     pass
@@ -316,7 +316,7 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, torch.Tensor],
     Returns:
        A namespace for metrics collected by the RunInference transform.
     """
-    return 'RunInferencePytorch'
+    return 'BeamML_PyTorch'
 
   def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]):
     pass
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
index 0a8f087b6d3..32036f43de8 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
@@ -327,8 +327,7 @@ class PytorchRunInferenceTest(unittest.TestCase):
   def test_namespace(self):
     inference_runner = TestPytorchModelHandlerForInferenceOnly(
         torch.device('cpu'))
-    self.assertEqual(
-        'RunInferencePytorch', inference_runner.get_metrics_namespace())
+    self.assertEqual('BeamML_PyTorch', inference_runner.get_metrics_namespace())
 
 
 @pytest.mark.uses_pytorch
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
index 1f49b10fb75..8fd1899ef96 100644
--- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py
+++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
@@ -141,6 +141,13 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
     """
     return sum(sys.getsizeof(element) for element in batch)
 
+  def get_metrics_namespace(self) -> str:
+    """
+    Returns:
+       A namespace for metrics collected by the RunInference transform.
+    """
+    return 'BeamML_Sklearn'
+
 
 @experimental(extra_message="No backwards-compatibility guarantees.")
 class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
@@ -211,3 +218,10 @@ class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
       The number of bytes of data for a batch.
     """
     return sum(df.memory_usage(deep=True).sum() for df in batch)
+
+  def get_metrics_namespace(self) -> str:
+    """
+    Returns:
+       A namespace for metrics collected by the RunInference transform.
+    """
+    return 'BeamML_Sklearn'
diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
index 3a55e5751b0..eafa9fde38d 100644
--- a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
+++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
@@ -30,7 +30,7 @@ class PytorchVisionBenchmarkTest(LoadTest):
   def __init__(self):
     # TODO (https://github.com/apache/beam/issues/23008)
     #  make get_namespace() method in RunInference static
-    self.metrics_namespace = 'RunInferencePytorch'
+    self.metrics_namespace = 'BeamML_PyTorch'
     super().__init__(metrics_namespace=self.metrics_namespace)
 
   def test(self):
diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
index 9d2f6c774bb..1d6ecb2bd43 100644
--- a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
+++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
@@ -26,7 +26,7 @@ class PytorchLanguageModelingBenchmarkTest(LoadTest):
   def __init__(self):
     # TODO (https://github.com/apache/beam/issues/23008):
     #  make get_namespace() method in RunInference static
-    self.metrics_namespace = 'RunInferencePytorch'
+    self.metrics_namespace = 'BeamML_PyTorch'
     super().__init__(metrics_namespace=self.metrics_namespace)
 
   def test(self):