You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 22:47:05 UTC
[beam] branch master updated: Inject PCollection id into
element_count metric.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 35c2cd3 Inject PCollection id into element_count metric.
new 05bc3f6 Merge pull request #7942 from Ardagan/RowCount0.1
35c2cd3 is described below
commit 35c2cd3bfcc7c3ee2fa9ae6fcb3e373bcd4e50ef
Author: Mikhail Gryzykhin <mi...@google.com>
AuthorDate: Fri Feb 22 13:54:51 2019 -0800
Inject PCollection id into element_count metric.
---
.../python/apache_beam/metrics/monitoring_infos.py | 9 +-
.../runners/portability/fn_api_runner.py | 5 +
.../runners/portability/fn_api_runner_test.py | 101 ++++++++++++++++++---
.../apache_beam/runners/worker/bundle_processor.py | 40 +++++++-
4 files changed, 137 insertions(+), 18 deletions(-)
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 2e9059f..d2fa09d 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -55,6 +55,11 @@ COUNTER_TYPES = set([SUM_INT64_TYPE])
DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
GAUGE_TYPES = set([LATEST_INT64_TYPE])
+# TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels
+PCOLLECTION_LABEL = 'PCOLLECTION'
+PTRANSFORM_LABEL = 'PTRANSFORM'
+TAG_LABEL = 'TAG'
+
def to_timestamp_proto(timestamp_secs):
"""Converts seconds since epoch to a google.protobuf.Timestamp.
@@ -103,9 +108,9 @@ def create_labels(ptransform='', tag=''):
"""
labels = {}
if tag:
- labels['TAG'] = tag
+ labels[TAG_LABEL] = tag
if ptransform:
- labels['PTRANSFORM'] = ptransform
+ labels[PTRANSFORM_LABEL] = ptransform
return labels
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e4cd6a4..a6eda80 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -1375,6 +1375,7 @@ class FnApiMetrics(metrics.metric.MetricResults):
self._gauges = {}
self._user_metrics_only = user_metrics_only
self._init_metrics_from_monitoring_infos(step_monitoring_infos)
+ self._monitoring_infos = step_monitoring_infos
def _init_metrics_from_monitoring_infos(self, step_monitoring_infos):
for smi in step_monitoring_infos.values():
@@ -1415,6 +1416,10 @@ class FnApiMetrics(metrics.metric.MetricResults):
self.DISTRIBUTIONS: distributions,
self.GAUGES: gauges}
+ def monitoring_infos(self):
+ return [item for sublist in self._monitoring_infos.values() for item in
+ sublist]
+
class RunnerResult(runner.PipelineResult):
def __init__(self, state, monitoring_infos_by_stage, metrics_by_stage):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e03240d..c023774 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -562,6 +562,76 @@ class FnApiRunnerTest(unittest.TestCase):
self.assertEqual(dist.committed.mean, 2.0)
self.assertEqual(gaug.committed.value, 3)
+ def test_element_count_metrics(self):
+ class GenerateTwoOutputs(beam.DoFn):
+ def process(self, element):
+ yield str(element) + '1'
+ yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
+ yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
+ yield beam.pvalue.TaggedOutput('ThirdOutput', str(element) + '3')
+
+ class PrintElements(beam.DoFn):
+ def process(self, element):
+ logging.debug(element)
+ yield element
+
+ p = self.create_pipeline()
+ if not isinstance(p.runner, fn_api_runner.FnApiRunner):
+ # This test is inherited by others that may not support the same
+ # internal way of accessing progress metrics.
+ self.skipTest('Metrics not supported.')
+
+ pcoll = p | beam.Create(['a1', 'a2'])
+
+ # pylint: disable=expression-not-assigned
+ pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
+ GenerateTwoOutputs()).with_outputs('SecondOutput',
+ 'ThirdOutput',
+ main='FirstAndMainOutput'))
+
+ # Actually feed pcollection to pardo
+ second_output, third_output, first_output = (pcoll | pardo)
+
+ # consume some of elements
+ merged = ((first_output, second_output, third_output) | beam.Flatten())
+ merged | ('PrintingStep') >> beam.ParDo(PrintElements())
+ second_output | ('PrintingStep2') >> beam.ParDo(PrintElements())
+
+ res = p.run()
+ res.wait_until_finish()
+
+ result_metrics = res.monitoring_metrics()
+
+ def assert_contains_metric(src, urn, pcollection, value):
+ for item in src:
+ if item.urn == urn:
+ if item.labels['PCOLLECTION'] == pcollection:
+ self.assertEqual(item.metric.counter_data.int64_value, value,
+ str(("Metric has incorrect value", value, item)))
+ return
+ self.fail(str(("Metric not found", urn, pcollection, src)))
+
+ counters = result_metrics.monitoring_infos()
+ self.assertFalse([x for x in counters if
+ x.urn == monitoring_infos.ELEMENT_COUNT_URN
+ and
+ monitoring_infos.PCOLLECTION_LABEL not in x.labels])
+
+ assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+ 'Impulse', 1)
+ assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+ 'ref_PCollection_PCollection_1', 2)
+
+ # Skipping other pcollections due to non-deterministic naming for multiple
+ # outputs.
+
+ assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+ 'ref_PCollection_PCollection_5', 8)
+ assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+ 'ref_PCollection_PCollection_6', 8)
+ assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+ 'ref_PCollection_PCollection_7', 2)
+
def test_non_user_metrics(self):
p = self.create_pipeline()
if not isinstance(p.runner, fn_api_runner.FnApiRunner):
@@ -587,7 +657,6 @@ class FnApiRunnerTest(unittest.TestCase):
self.assertEqual(
1, found, "Did not find exactly 1 metric for %s." % metric_key)
urns = [
- monitoring_infos.ELEMENT_COUNT_URN,
monitoring_infos.START_BUNDLE_MSECS_URN,
monitoring_infos.PROCESS_BUNDLE_MSECS_URN,
monitoring_infos.FINISH_BUNDLE_MSECS_URN,
@@ -613,6 +682,7 @@ class FnApiRunnerTest(unittest.TestCase):
# This test is inherited by others that may not support the same
# internal way of accessing progress metrics.
self.skipTest('Progress metrics not supported.')
+ return
_ = (p
| beam.Create([0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS])
@@ -624,6 +694,7 @@ class FnApiRunnerTest(unittest.TestCase):
beam.pvalue.TaggedOutput('once', x),
beam.pvalue.TaggedOutput('twice', x),
beam.pvalue.TaggedOutput('twice', x)]))
+
res = p.run()
res.wait_until_finish()
@@ -675,21 +746,27 @@ class FnApiRunnerTest(unittest.TestCase):
# Test the new MonitoringInfo monitoring format.
self.assertEqual(2, len(res._monitoring_infos_by_stage))
pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
+
if not has_mi_for_ptransform(pregbk_mis, 'Create/Read'):
# The monitoring infos above are actually unordered. Swap.
pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
def assert_has_monitoring_info(
monitoring_infos, urn, labels, value=None, ge_value=None):
+ def contains_labels(monitoring_info, labels):
+ return len([x for x in labels.items() if
+ x[0] in monitoring_info.labels and monitoring_info.labels[
+ x[0]] == x[1]]) == len(labels)
+
# TODO(ajamato): Consider adding a matcher framework
found = 0
- for m in monitoring_infos:
- if m.labels == labels and m.urn == urn:
+ for mi in monitoring_infos:
+ if contains_labels(mi, labels) and mi.urn == urn:
if (ge_value is not None and
- m.metric.counter_data.int64_value >= ge_value):
+ mi.metric.counter_data.int64_value >= ge_value):
found = found + 1
elif (value is not None and
- m.metric.counter_data.int64_value == value):
+ mi.metric.counter_data.int64_value == value):
found = found + 1
ge_value_str = {'ge_value' : ge_value} if ge_value else ''
value_str = {'value' : value} if value else ''
@@ -698,10 +775,10 @@ class FnApiRunnerTest(unittest.TestCase):
(found, (urn, labels, value_str, ge_value_str),))
# pregbk monitoring infos
- labels = {'PTRANSFORM' : 'Create/Read', 'TAG' : 'out'}
+ labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_1'}
assert_has_monitoring_info(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
- labels = {'PTRANSFORM' : 'Map(sleep)', 'TAG' : 'None'}
+ labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_2'}
assert_has_monitoring_info(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
labels = {'PTRANSFORM' : 'Map(sleep)'}
@@ -710,18 +787,12 @@ class FnApiRunnerTest(unittest.TestCase):
labels, ge_value=4 * DEFAULT_SAMPLING_PERIOD_MS)
# postgbk monitoring infos
- labels = {'PTRANSFORM' : 'GroupByKey/Read', 'TAG' : 'None'}
+ labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_6'}
assert_has_monitoring_info(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
- labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'None'}
+ labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_7'}
assert_has_monitoring_info(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
- labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'once'}
- assert_has_monitoring_info(
- postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
- labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'twice'}
- assert_has_monitoring_info(
- postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=2)
except:
print(res._monitoring_infos_by_stage)
raise
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 0eba87c..9455879 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -541,6 +541,7 @@ class BundleProcessor(object):
# ignores input name
input_op_by_target[
input_op.target.primitive_transform_reference] = input_op
+
for data_channel, expected_targets in data_channels.items():
for data in data_channel.input_elements(
instruction_id, expected_targets):
@@ -649,7 +650,44 @@ class BundleProcessor(object):
for mi in op.monitoring_infos(transform_id).values():
fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
- return list(all_monitoring_infos_dict.values())
+
+ infos_list = list(all_monitoring_infos_dict.values())
+
+ def inject_pcollection_into_element_count(monitoring_info):
+ """
+ If provided metric is element count metric:
+ Finds relevant transform output info in current process_bundle_descriptor
+ and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
+ info.
+ """
+ if monitoring_info.urn == monitoring_infos.ELEMENT_COUNT_URN:
+ if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
+ return
+ ptransform_label = monitoring_info.labels[
+ monitoring_infos.PTRANSFORM_LABEL]
+ if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
+ return
+ tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
+
+ if not ptransform_label in self.process_bundle_descriptor.transforms:
+ return
+ if not tag_label in self.process_bundle_descriptor.transforms[
+ ptransform_label].outputs:
+ return
+
+ pcollection_name = (self.process_bundle_descriptor
+ .transforms[ptransform_label].outputs[tag_label])
+ monitoring_info.labels[
+ monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
+
+ # Cleaning up labels that are not in specification.
+ monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
+ monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
+
+ for mi in infos_list:
+ inject_pcollection_into_element_count(mi)
+
+ return infos_list
def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
actual_output_tags = list(