You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/08/31 14:50:07 UTC
[beam] branch master updated: Keep track of metrics at the KeyedModelHandler level (#28228)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 205083dd72f Keep track of metrics at the KeyedModelHandler level (#28228)
205083dd72f is described below
commit 205083dd72fd53663d865c0fb5752cc3dfdf428a
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu Aug 31 10:49:58 2023 -0400
Keep track of metrics at the KeyedModelHandler level (#28228)
* Update KeyMhMapping to KeyModelMapping
* Keep track of metrics at the KeyedModelHandler level
* Update returns doc
---
sdks/python/apache_beam/ml/inference/base.py | 126 ++++++++++++++++++----
sdks/python/apache_beam/ml/inference/base_test.py | 84 ++++++++++++---
2 files changed, 177 insertions(+), 33 deletions(-)
diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py
index 0964fc46a95..80b73c00675 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -108,6 +108,12 @@ class RunInferenceDLQ(NamedTuple):
failed_postprocessing: Sequence[beam.PCollection]
+class _ModelLoadStats(NamedTuple):
+ model_tag: str
+ load_latency: Optional[int]
+ byte_size: Optional[int]
+
+
ModelMetadata.model_id.__doc__ = """Unique identifier for the model. This can be
a file path or a URL where the model can be accessed. It is used to load
the model for inference."""
@@ -130,17 +136,18 @@ class KeyModelPathMapping(Generic[KeyT]):
Dataclass for mapping 1 or more keys to 1 model path. This is used in
conjunction with a KeyedModelHandler with many model handlers to update
a set of keys' model handlers with the new path. Given
- `KeyModelPathMapping(keys: ['key1', 'key2'], update_path: 'updated/path')`,
- all examples with keys `key1` or `key2` will have their corresponding model
- handler's update_model function called with 'updated/path'. For more
- information see the
- KeyedModelHandler documentation
+ `KeyModelPathMapping(keys: ['key1', 'key2'], update_path: 'updated/path',
+ model_id: 'id1')`, all examples with keys `key1` or `key2` will have their
+ corresponding model handler's update_model function called with
+ 'updated/path' and their metrics will correspond with 'id1'. For more
+ information see the KeyedModelHandler documentation
https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler
documentation and the website section on model updates
https://beam.apache.org/documentation/sdks/python-machine-learning/#automatic-model-refresh
"""
keys: List[KeyT]
update_path: str
+ model_id: str = ''
class ModelHandler(Generic[ExampleT, PredictionT, ModelT]):
@@ -286,6 +293,12 @@ class ModelHandler(Generic[ExampleT, PredictionT, ModelT]):
https://beam.apache.org/releases/pydoc/current/apache_beam.utils.multi_process_shared.html"""
return False
+ def override_metrics(self, metrics_namespace: str = '') -> bool:
+ """Returns a boolean representing whether or not a model handler will
+ override metrics reporting. If True, RunInference will not report any
+ metrics."""
+ return False
+
class _ModelManager:
"""
@@ -318,13 +331,14 @@ class _ModelManager:
# of this map should last as long as the corresponding entry in _tag_map.
self._proxy_map: Dict[str, multi_process_shared.MultiProcessShared] = {}
- def load(self, key: str) -> str:
+ def load(self, key: str) -> _ModelLoadStats:
"""
Loads the appropriate model for the given key into memory.
Args:
key: the key associated with the model we'd like to load.
Returns:
- the tag we can use to access the model using multi_process_shared.py.
+ _ModelLoadStats with tag, byte size, and latency to load the model. If
+ the model was already loaded, byte size/latency will be None.
"""
# Map the key for a model to a unique tag that will persist until the model
# is released. This needs to be unique between releasing/reacquiring th
@@ -332,6 +346,7 @@ class _ModelManager:
# has been released and deleted.
if key in self._tag_map:
self._tag_map.move_to_end(key)
+ return _ModelLoadStats(self._tag_map[key], None, None)
else:
self._tag_map[key] = uuid.uuid4().hex
@@ -346,12 +361,17 @@ class _ModelManager:
del self._proxy_map[tag_to_remove]
# Load the new model
+ memory_before = _get_current_process_memory_in_bytes()
+ start_time = _to_milliseconds(time.time_ns())
shared_handle = multi_process_shared.MultiProcessShared(
mh.load_model, tag=tag)
model_reference = shared_handle.acquire()
self._proxy_map[tag] = (shared_handle, model_reference)
+ memory_after = _get_current_process_memory_in_bytes()
+ end_time = _to_milliseconds(time.time_ns())
- return tag
+ return _ModelLoadStats(
+ tag, end_time - start_time, memory_after - memory_before)
def increment_max_models(self, increment: int):
"""
@@ -460,12 +480,24 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
must appear in your list of KeyModelPathMappings exactly once. No
additional keys can be added.
+ When using many models defined per key, metrics about inference and model
+ loading will be gathered on an aggregate basis for all keys. These will be
+ reported with no prefix. Metrics will also be gathered on a per key basis.
+ Since some keys can share the same model, only one set of metrics will be
+ reported per key 'cohort'. These will be reported in the form:
+ `<cohort_key>-<metric_name>`, where `<cohort_key>` can be any key selected
+ from the cohort. When model updates occur, the metrics will be reported in
+ the form `<cohort_key>-<model id>-<metric_name>`.
+
Args:
unkeyed: Either (a) an implementation of ModelHandler that does not
require keys or (b) a list of KeyModelMappings mapping lists of keys to
unkeyed ModelHandlers.
"""
+ self._metrics_collectors: Dict[str, _MetricsCollector] = {}
+ self._default_metrics_collector: _MetricsCollector = None
+ self._metrics_namespace = ''
self._single_model = not isinstance(unkeyed, list)
if self._single_model:
if len(unkeyed.get_preprocess_fns()) or len(
@@ -564,17 +596,41 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
predictions = []
for id, keys in key_by_id.items():
mh = self._id_to_mh_map[id]
- keyed_model_tag = model.load(id)
+ loaded_model = model.load(id)
+ keyed_model_tag = loaded_model.model_tag
+ if loaded_model.byte_size is not None:
+ self._metrics_collectors[id].update_load_model_metrics(
+ loaded_model.load_latency, loaded_model.byte_size)
+ self._default_metrics_collector.update_load_model_metrics(
+ loaded_model.load_latency, loaded_model.byte_size)
keyed_model_shared_handle = multi_process_shared.MultiProcessShared(
mh.load_model, tag=keyed_model_tag)
keyed_model = keyed_model_shared_handle.acquire()
- for key in keys:
- unkeyed_batches = batch_by_key[key]
- for inf in mh.run_inference(unkeyed_batches,
- keyed_model,
- inference_args):
- predictions.append((key, inf))
- keyed_model_shared_handle.release(keyed_model)
+ start_time = _to_microseconds(time.time_ns())
+ num_bytes = 0
+ num_elements = 0
+ try:
+ for key in keys:
+ unkeyed_batches = batch_by_key[key]
+ try:
+ for inf in mh.run_inference(unkeyed_batches,
+ keyed_model,
+ inference_args):
+ predictions.append((key, inf))
+ except BaseException as e:
+ self._metrics_collectors[id].failed_batches_counter.inc()
+ self._default_metrics_collector.failed_batches_counter.inc()
+ raise e
+ num_bytes += mh.get_num_bytes(unkeyed_batches)
+ num_elements += len(unkeyed_batches)
+ finally:
+ keyed_model_shared_handle.release(keyed_model)
+ end_time = _to_microseconds(time.time_ns())
+ inference_latency = end_time - start_time
+ self._metrics_collectors[id].update(
+ num_elements, num_bytes, inference_latency)
+ self._default_metrics_collector.update(
+ num_elements, num_bytes, inference_latency)
return predictions
@@ -641,10 +697,12 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
# }
# }
cohort_path_mapping: Dict[KeyT, Dict[str, List[KeyT]]] = {}
+ key_modelid_mapping: Dict[KeyT, str] = {}
seen_keys = set()
for mp in model_paths:
keys = mp.keys
update_path = mp.update_path
+ model_id = mp.model_id
if len(update_path) == 0:
raise ValueError(f'Invalid model update, path for {keys} is empty')
for key in keys:
@@ -658,6 +716,7 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
raise ValueError(
f'Invalid model update: {key} appears in '
'update, but not in the original configuration.')
+ key_modelid_mapping[key] = model_id
cohort_id = self._key_to_id_map[key]
if cohort_id not in cohort_path_mapping:
cohort_path_mapping[cohort_id] = defaultdict(list)
@@ -682,6 +741,9 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
self._id_to_mh_map[cohort_id] = deepcopy(mh)
self._id_to_mh_map[cohort_id].update_model_path(updated_path)
model.update_model_handler(cohort_id, updated_path, old_cohort_id)
+ model_id = key_modelid_mapping[cohort_id]
+ self._metrics_collectors[cohort_id] = _MetricsCollector(
+ self._metrics_namespace, f'{cohort_id}-{model_id}-')
def update_model_path(self, model_path: Optional[str] = None):
if self._single_model:
@@ -697,6 +759,18 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
return self._unkeyed.share_model_across_processes()
return True
+ def override_metrics(self, metrics_namespace: str = '') -> bool:
+ if self._single_model:
+ return self._unkeyed.override_metrics(metrics_namespace)
+
+ self._metrics_namespace = metrics_namespace
+ self._default_metrics_collector = _MetricsCollector(metrics_namespace)
+ for cohort_id in self._id_to_mh_map:
+ self._metrics_collectors[cohort_id] = _MetricsCollector(
+ metrics_namespace, f'{cohort_id}-')
+
+ return True
+
class MaybeKeyedModelHandler(Generic[KeyT, ExampleT, PredictionT, ModelT],
ModelHandler[Union[ExampleT, Tuple[KeyT,
@@ -1188,6 +1262,10 @@ class _MetricsCollector:
self._load_model_latency_milli_secs_cache = load_model_latency_ms
self._model_byte_size_cache = model_byte_size
+ def update_load_model_metrics(self, load_model_latency_ms, model_byte_size):
+ self._load_model_latency_milli_secs.update(load_model_latency_ms)
+ self._model_byte_size.update(model_byte_size)
+
def update(
self,
examples_count: int,
@@ -1244,8 +1322,9 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
memory_after = _get_current_process_memory_in_bytes()
load_model_latency_ms = end_time - start_time
model_byte_size = memory_after - memory_before
- self._metrics_collector.cache_load_model_metrics(
- load_model_latency_ms, model_byte_size)
+ if self._metrics_collector:
+ self._metrics_collector.cache_load_model_metrics(
+ load_model_latency_ms, model_byte_size)
return model
# TODO(https://github.com/apache/beam/issues/21443): Investigate releasing
@@ -1277,6 +1356,8 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
metrics_namespace = (
self._metrics_namespace) if self._metrics_namespace else (
self._model_handler.get_metrics_namespace())
+ if self._model_handler.override_metrics(metrics_namespace):
+ return None
return _MetricsCollector(metrics_namespace, prefix=prefix)
def setup(self):
@@ -1297,7 +1378,8 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
result_generator = self._model_handler.run_inference(
batch, self._model, inference_args)
except BaseException as e:
- self._metrics_collector.failed_batches_counter.inc()
+ if self._metrics_collector:
+ self._metrics_collector.failed_batches_counter.inc()
raise e
predictions = list(result_generator)
@@ -1305,7 +1387,8 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
inference_latency = end_time - start_time
num_bytes = self._model_handler.get_num_bytes(batch)
num_elements = len(batch)
- self._metrics_collector.update(num_elements, num_bytes, inference_latency)
+ if self._metrics_collector:
+ self._metrics_collector.update(num_elements, num_bytes, inference_latency)
return predictions
@@ -1345,7 +1428,8 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
def finish_bundle(self):
# TODO(https://github.com/apache/beam/issues/21435): Figure out why there
# is a cache.
- self._metrics_collector.update_metrics_with_cache()
+ if self._metrics_collector:
+ self._metrics_collector.update_metrics_with_cache()
def _is_darwin() -> bool:
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py
index af6168c80af..f2146cdd1e5 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -286,6 +286,66 @@ class RunInferenceBaseTest(unittest.TestCase):
actual = pcoll | base.RunInference(base.KeyedModelHandler(mhs))
assert_that(actual, equal_to(expected), label='assert:inferences')
+ def test_run_inference_impl_with_keyed_examples_many_model_handlers_metrics(
+ self):
+ pipeline = TestPipeline()
+ examples = [1, 5, 3, 10]
+ metrics_namespace = 'test_namespace'
+ keyed_examples = [(i, example) for i, example in enumerate(examples)]
+ pcoll = pipeline | 'start' >> beam.Create(keyed_examples)
+ mhs = [
+ base.KeyModelMapping([0],
+ FakeModelHandler(
+ state=200, multi_process_shared=True)),
+ base.KeyModelMapping([1, 2, 3],
+ FakeModelHandler(multi_process_shared=True))
+ ]
+ _ = pcoll | base.RunInference(
+ base.KeyedModelHandler(mhs), metrics_namespace=metrics_namespace)
+ result = pipeline.run()
+ result.wait_until_finish()
+
+ metrics_filter = MetricsFilter().with_namespace(namespace=metrics_namespace)
+ metrics = result.metrics().query(metrics_filter)
+ assert len(metrics['counters']) != 0
+ assert len(metrics['distributions']) != 0
+
+ metrics_filter = MetricsFilter().with_name('0-_num_inferences')
+ metrics = result.metrics().query(metrics_filter)
+ num_inferences_counter_key_0 = metrics['counters'][0]
+ self.assertEqual(num_inferences_counter_key_0.committed, 1)
+
+ metrics_filter = MetricsFilter().with_name('1-_num_inferences')
+ metrics = result.metrics().query(metrics_filter)
+ num_inferences_counter_key_1 = metrics['counters'][0]
+ self.assertEqual(num_inferences_counter_key_1.committed, 3)
+
+ metrics_filter = MetricsFilter().with_name('num_inferences')
+ metrics = result.metrics().query(metrics_filter)
+ num_inferences_counter_aggregate = metrics['counters'][0]
+ self.assertEqual(num_inferences_counter_aggregate.committed, 4)
+
+ metrics_filter = MetricsFilter().with_name('0-_failed_batches_counter')
+ metrics = result.metrics().query(metrics_filter)
+ failed_batches_counter_key_0 = metrics['counters']
+ self.assertEqual(len(failed_batches_counter_key_0), 0)
+
+ metrics_filter = MetricsFilter().with_name('failed_batches_counter')
+ metrics = result.metrics().query(metrics_filter)
+ failed_batches_counter_aggregate = metrics['counters']
+ self.assertEqual(len(failed_batches_counter_aggregate), 0)
+
+ metrics_filter = MetricsFilter().with_name(
+ '0-_load_model_latency_milli_secs')
+ metrics = result.metrics().query(metrics_filter)
+ load_latency_dist_key_0 = metrics['distributions'][0]
+ self.assertEqual(load_latency_dist_key_0.committed.count, 1)
+
+ metrics_filter = MetricsFilter().with_name('load_model_latency_milli_secs')
+ metrics = result.metrics().query(metrics_filter)
+ load_latency_dist_aggregate = metrics['distributions'][0]
+ self.assertEqual(load_latency_dist_aggregate.committed.count, 2)
+
def test_keyed_many_model_handlers_validation(self):
def mult_two(example: str) -> int:
return int(example) * 2
@@ -1285,7 +1345,7 @@ class RunInferenceBaseTest(unittest.TestCase):
'key3': FakeModelHandler(state=3)
}
mm = base._ModelManager(mh_map=mhs)
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
# Use bad_mh's load function to make sure we're actually loading the
# version already stored
bad_mh = FakeModelHandler(state=100)
@@ -1293,8 +1353,8 @@ class RunInferenceBaseTest(unittest.TestCase):
bad_mh.load_model, tag=tag1).acquire()
self.assertEqual(1, model1.predict(10))
- tag2 = mm.load('key2')
- tag3 = mm.load('key3')
+ tag2 = mm.load('key2').model_tag
+ tag3 = mm.load('key3').model_tag
model2 = multi_process_shared.MultiProcessShared(
bad_mh.load_model, tag=tag2).acquire()
model3 = multi_process_shared.MultiProcessShared(
@@ -1308,14 +1368,14 @@ class RunInferenceBaseTest(unittest.TestCase):
mh3 = FakeModelHandler(state=3)
mhs = {'key1': mh1, 'key2': mh2, 'key3': mh3}
mm = base._ModelManager(mh_map=mhs, max_models=2)
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
sh1 = multi_process_shared.MultiProcessShared(mh1.load_model, tag=tag1)
model1 = sh1.acquire()
self.assertEqual(1, model1.predict(10))
model1.increment_state(5)
- tag2 = mm.load('key2')
- tag3 = mm.load('key3')
+ tag2 = mm.load('key2').model_tag
+ tag3 = mm.load('key3').model_tag
sh2 = multi_process_shared.MultiProcessShared(mh2.load_model, tag=tag2)
model2 = sh2.acquire()
sh3 = multi_process_shared.MultiProcessShared(mh3.load_model, tag=tag3)
@@ -1349,7 +1409,7 @@ class RunInferenceBaseTest(unittest.TestCase):
mh1 = FakeModelHandler(state=1)
mhs = {'key1': mh1}
mm = base._ModelManager(mh_map=mhs)
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
sh1 = multi_process_shared.MultiProcessShared(mh1.load_model, tag=tag1)
model1 = sh1.acquire()
self.assertEqual(1, model1.predict(10))
@@ -1359,7 +1419,7 @@ class RunInferenceBaseTest(unittest.TestCase):
self.assertEqual(6, model1.predict(10))
sh1.release(model1)
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
sh1 = multi_process_shared.MultiProcessShared(mh1.load_model, tag=tag1)
model1 = sh1.acquire()
self.assertEqual(1, model1.predict(10))
@@ -1369,7 +1429,7 @@ class RunInferenceBaseTest(unittest.TestCase):
# Shouldn't evict if path is the same as last update
mm.update_model_handler('key1', 'fake/path', 'key1')
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
sh1 = multi_process_shared.MultiProcessShared(mh1.load_model, tag=tag1)
model1 = sh1.acquire()
self.assertEqual(6, model1.predict(10))
@@ -1383,7 +1443,7 @@ class RunInferenceBaseTest(unittest.TestCase):
mhs = {'key1': mh1, 'key2': mh2, 'key3': mh3}
mm = base._ModelManager(mh_map=mhs, max_models=1)
mm.increment_max_models(1)
- tag1 = mm.load('key1')
+ tag1 = mm.load('key1').model_tag
sh1 = multi_process_shared.MultiProcessShared(mh1.load_model, tag=tag1)
model1 = sh1.acquire()
self.assertEqual(1, model1.predict(10))
@@ -1391,8 +1451,8 @@ class RunInferenceBaseTest(unittest.TestCase):
self.assertEqual(6, model1.predict(10))
sh1.release(model1)
- tag2 = mm.load('key2')
- tag3 = mm.load('key3')
+ tag2 = mm.load('key2').model_tag
+ tag3 = mm.load('key3').model_tag
sh2 = multi_process_shared.MultiProcessShared(mh2.load_model, tag=tag2)
model2 = sh2.acquire()
sh3 = multi_process_shared.MultiProcessShared(mh3.load_model, tag=tag3)