You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/09/15 13:13:58 UTC
[beam] branch master updated: Pass namespace through RunInference transform (#23182)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f7b31ebb033 Pass namespace through RunInference transform (#23182)
f7b31ebb033 is described below
commit f7b31ebb03378dbb0254bd6f8022b0f6272b1e60
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Thu Sep 15 09:13:47 2022 -0400
Pass namespace through RunInference transform (#23182)
* Pass namespace through RunInference transform
* Refactor namespace to metrics_namespace
* Update doc string
Testing an example with a custom namespace
* Add a stage name to the base RunInferene DoFn
* Unit test for custom namespace
Revert "Testing an example with a custom namespace"
This reverts commit 39163fdc311ba87403754a453efad008e48fbf77.
Update sdks/python/apache_beam/ml/inference/base_test.py
* Refactor RunInference default namespace
* Fixup unit test
* Refactor ModelHandler default namespace for consistency
---
sdks/python/apache_beam/ml/inference/base.py | 23 ++++++++++++++++------
sdks/python/apache_beam/ml/inference/base_test.py | 19 ++++++++++++++++++
.../apache_beam/ml/inference/pytorch_inference.py | 4 ++--
.../ml/inference/pytorch_inference_test.py | 3 +--
.../apache_beam/ml/inference/sklearn_inference.py | 14 +++++++++++++
.../pytorch_image_classification_benchmarks.py | 2 +-
.../pytorch_language_modeling_benchmarks.py | 2 +-
7 files changed, 55 insertions(+), 12 deletions(-)
diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py
index 075260ec3ec..8b88809d329 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -272,7 +272,8 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
self,
model_handler: ModelHandler[ExampleT, PredictionT, Any],
clock=time,
- inference_args: Optional[Dict[str, Any]] = None):
+ inference_args: Optional[Dict[str, Any]] = None,
+ metrics_namespace: Optional[str] = None):
"""A transform that takes a PCollection of examples (or features) to be used
on an ML model. It will then output inferences (or predictions) for those
examples in a PCollection of PredictionResults, containing the input
@@ -289,10 +290,12 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
clock: A clock implementing time_ns. *Used for unit testing.*
inference_args: Extra arguments for models whose inference call requires
extra parameters.
+ metrics_namespace: Namespace of the transform to collect metrics.
"""
self._model_handler = model_handler
self._inference_args = inference_args
self._clock = clock
+ self._metrics_namespace = metrics_namespace
# TODO(BEAM-14046): Add and link to help documentation.
@classmethod
@@ -321,9 +324,10 @@ class RunInference(beam.PTransform[beam.PCollection[ExampleT],
# TODO(https://github.com/apache/beam/issues/21440): Hook into the
# batching DoFn APIs.
| beam.BatchElements(**self._model_handler.batch_elements_kwargs())
- | (
+ | 'BeamML_RunInference' >> (
beam.ParDo(
- _RunInferenceDoFn(self._model_handler, self._clock),
+ _RunInferenceDoFn(
+ self._model_handler, self._clock, self._metrics_namespace),
self._inference_args).with_resource_hints(**resource_hints)))
@@ -378,17 +382,22 @@ class _MetricsCollector:
class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
def __init__(
- self, model_handler: ModelHandler[ExampleT, PredictionT, Any], clock):
+ self,
+ model_handler: ModelHandler[ExampleT, PredictionT, Any],
+ clock,
+ metrics_namespace):
"""A DoFn implementation generic to frameworks.
Args:
model_handler: An implementation of ModelHandler.
clock: A clock implementing time_ns. *Used for unit testing.*
+ metrics_namespace: Namespace of the transform to collect metrics.
"""
self._model_handler = model_handler
self._shared_model_handle = shared.Shared()
self._clock = clock
self._model = None
+ self._metrics_namespace = metrics_namespace
def _load_model(self):
def load():
@@ -409,8 +418,10 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
return self._shared_model_handle.acquire(load)
def setup(self):
- self._metrics_collector = _MetricsCollector(
- self._model_handler.get_metrics_namespace())
+ metrics_namespace = (
+ self._metrics_namespace) if self._metrics_namespace else (
+ self._model_handler.get_metrics_namespace())
+ self._metrics_collector = _MetricsCollector(metrics_namespace)
self._model = self._load_model()
def process(self, batch, inference_args):
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py
index ca79a3cd3a3..278485666a0 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -142,6 +142,25 @@ class RunInferenceBaseTest(unittest.TestCase):
inference_args=inference_args)
assert_that(actual, equal_to(examples), label='assert:inferences')
+ def test_run_inference_metrics_with_custom_namespace(self):
+ metrics_namespace = 'my_custom_namespace'
+ pipeline = TestPipeline()
+ examples = [1, 5, 3, 10]
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ _ = pcoll | base.RunInference(
+ FakeModelHandler(), metrics_namespace=metrics_namespace)
+ result = pipeline.run()
+ result.wait_until_finish()
+
+ metrics_filter = MetricsFilter().with_namespace(namespace=metrics_namespace)
+ metrics = result.metrics().query(metrics_filter)
+ assert len(metrics['counters']) != 0
+ assert len(metrics['distributions']) != 0
+
+ metrics_filter = MetricsFilter().with_namespace(namespace='fake_namespace')
+ metrics = result.metrics().query(metrics_filter)
+ assert len(metrics['counters']) == len(metrics['distributions']) == 0
+
def test_unexpected_inference_args_passed(self):
with self.assertRaisesRegex(ValueError, r'inference_args were provided'):
with TestPipeline() as pipeline:
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
index c801e193daf..d97205937e2 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
@@ -196,7 +196,7 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
Returns:
A namespace for metrics collected by the RunInference transform.
"""
- return 'RunInferencePytorch'
+ return 'BeamML_PyTorch'
def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]):
pass
@@ -316,7 +316,7 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, torch.Tensor],
Returns:
A namespace for metrics collected by the RunInference transform.
"""
- return 'RunInferencePytorch'
+ return 'BeamML_PyTorch'
def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]):
pass
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
index 0a8f087b6d3..32036f43de8 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py
@@ -327,8 +327,7 @@ class PytorchRunInferenceTest(unittest.TestCase):
def test_namespace(self):
inference_runner = TestPytorchModelHandlerForInferenceOnly(
torch.device('cpu'))
- self.assertEqual(
- 'RunInferencePytorch', inference_runner.get_metrics_namespace())
+ self.assertEqual('BeamML_PyTorch', inference_runner.get_metrics_namespace())
@pytest.mark.uses_pytorch
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
index 1f49b10fb75..8fd1899ef96 100644
--- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py
+++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
@@ -141,6 +141,13 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
"""
return sum(sys.getsizeof(element) for element in batch)
+ def get_metrics_namespace(self) -> str:
+ """
+ Returns:
+ A namespace for metrics collected by the RunInference transform.
+ """
+ return 'BeamML_Sklearn'
+
@experimental(extra_message="No backwards-compatibility guarantees.")
class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
@@ -211,3 +218,10 @@ class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
The number of bytes of data for a batch.
"""
return sum(df.memory_usage(deep=True).sum() for df in batch)
+
+ def get_metrics_namespace(self) -> str:
+ """
+ Returns:
+ A namespace for metrics collected by the RunInference transform.
+ """
+ return 'BeamML_Sklearn'
diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
index 3a55e5751b0..eafa9fde38d 100644
--- a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
+++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_classification_benchmarks.py
@@ -30,7 +30,7 @@ class PytorchVisionBenchmarkTest(LoadTest):
def __init__(self):
# TODO (https://github.com/apache/beam/issues/23008)
# make get_namespace() method in RunInference static
- self.metrics_namespace = 'RunInferencePytorch'
+ self.metrics_namespace = 'BeamML_PyTorch'
super().__init__(metrics_namespace=self.metrics_namespace)
def test(self):
diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
index 9d2f6c774bb..1d6ecb2bd43 100644
--- a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
+++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_language_modeling_benchmarks.py
@@ -26,7 +26,7 @@ class PytorchLanguageModelingBenchmarkTest(LoadTest):
def __init__(self):
# TODO (https://github.com/apache/beam/issues/23008):
# make get_namespace() method in RunInference static
- self.metrics_namespace = 'RunInferencePytorch'
+ self.metrics_namespace = 'BeamML_PyTorch'
super().__init__(metrics_namespace=self.metrics_namespace)
def test(self):