You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 22:30:51 UTC
[beam] branch master updated: [BEAM-4374] Emit SampledByteCount
distribution tuple system metric from Python SDK (@Ardagan co-contributed)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21f61dd [BEAM-4374] Emit SampledByteCount distribution tuple system metric from Python SDK (@Ardagan co-contributed)
new d4afbab Merge pull request #8062 from ajamato/mean_byte_count
21f61dd is described below
commit 21f61dd1ef1d934a474e0fd05343dc83a09d89b2
Author: Alex Amato <aj...@google.com>
AuthorDate: Wed Mar 13 17:56:17 2019 -0700
[BEAM-4374] Emit SampledByteCount distribution tuple system metric from
Python SDK (@Ardagan co-contributed)
---
model/pipeline/src/main/proto/metrics.proto | 13 +
.../python/apache_beam/metrics/monitoring_infos.py | 50 ++-
sdks/python/apache_beam/pipeline.py | 6 +-
.../runners/portability/fn_api_runner_test.py | 382 +++++++++++++++------
.../apache_beam/runners/worker/bundle_processor.py | 9 +-
.../apache_beam/runners/worker/opcounters.py | 3 +-
.../apache_beam/runners/worker/opcounters_test.py | 4 +-
.../apache_beam/runners/worker/operations.pxd | 2 +-
.../apache_beam/runners/worker/operations.py | 53 ++-
9 files changed, 386 insertions(+), 136 deletions(-)
diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index 3b45efb..7d41d57 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -81,6 +81,19 @@ message MonitoringInfoSpecs {
} ]
}];
+ SAMPLED_BYTE_SIZE = 7 [(monitoring_info_spec) = {
+ urn: "beam:metric:sampled_byte_size:v1",
+ type_urn: "beam:metrics:distribution_int_64",
+ required_labels: [ "PCOLLECTION" ],
+ annotations: [ {
+ key: "description",
+ value: "The total byte size and count of a sampled "
+ " set (or all) of elements in the pcollection. Sampling is used "
+ " because calculating the byte count involves serializing the "
+ " elements which is CPU intensive."
+ } ]
+ }];
+
START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
type_urn: "beam:metrics:sum_int_64",
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 76f3b45..0e73461 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -31,10 +31,10 @@ from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.cells import GaugeData
from apache_beam.metrics.cells import GaugeResult
from apache_beam.portability import common_urns
-from apache_beam.portability.api.metrics_pb2 import CounterData
-from apache_beam.portability.api.metrics_pb2 import Metric
-from apache_beam.portability.api.metrics_pb2 import MonitoringInfo
+from apache_beam.portability.api import metrics_pb2
+SAMPLED_BYTE_SIZE_URN = (
+ common_urns.monitoring_info_specs.SAMPLED_BYTE_SIZE.spec.urn)
ELEMENT_COUNT_URN = common_urns.monitoring_info_specs.ELEMENT_COUNT.spec.urn
START_BUNDLE_MSECS_URN = (
common_urns.monitoring_info_specs.START_BUNDLE_MSECS.spec.urn)
@@ -141,8 +141,8 @@ def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
name=name)
if isinstance(metric, int):
- metric = Metric(
- counter_data=CounterData(
+ metric = metrics_pb2.Metric(
+ counter_data=metrics_pb2.CounterData(
int64_value=metric
)
)
@@ -162,8 +162,8 @@ def int64_counter(urn, metric, ptransform=None, tag=None):
"""
labels = create_labels(ptransform=ptransform, tag=tag)
if isinstance(metric, int):
- metric = Metric(
- counter_data=CounterData(
+ metric = metrics_pb2.Metric(
+ counter_data=metrics_pb2.CounterData(
int64_value=metric
)
)
@@ -186,6 +186,20 @@ def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
DISTRIBUTION_INT64_TYPE, metric, labels)
+def int64_distribution(urn, metric, ptransform=None, tag=None):
+ """Return a distribution monitoring info for the URN, metric and labels.
+
+ Args:
+ urn: The URN of the monitoring info/metric.
+ metric: The metric proto field to use in the monitoring info.
+ ptransform: The ptransform/step name used as a label.
+ tag: The output tag name, used as a label.
+ """
+ labels = create_labels(ptransform=ptransform, tag=tag)
+ return create_monitoring_info(
+ urn, DISTRIBUTION_INT64_TYPE, metric, labels)
+
+
def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
"""Return the gauge monitoring info for the URN, metric and labels.
@@ -214,7 +228,7 @@ def create_monitoring_info(urn, type_urn, metric_proto, labels=None):
Or an int value.
labels: The label dictionary to use in the MonitoringInfo.
"""
- return MonitoringInfo(
+ return metrics_pb2.MonitoringInfo(
urn=urn,
type=type_urn,
labels=labels or dict(),
@@ -296,12 +310,24 @@ def to_key(monitoring_info_proto):
return frozenset(key_items)
+def distribution_combiner(metric_a, metric_b):
+ a_data = metric_a.distribution_data.int_distribution_data
+ b_data = metric_b.distribution_data.int_distribution_data
+ return metrics_pb2.Metric(
+ distribution_data=metrics_pb2.DistributionData(
+ int_distribution_data=metrics_pb2.IntDistributionData(
+ count=a_data.count + b_data.count,
+ sum=a_data.sum + b_data.sum,
+ min=min(a_data.min, b_data.min),
+ max=max(a_data.max, b_data.max))))
+
+
_KNOWN_COMBINERS = {
- SUM_INT64_TYPE: lambda a, b: Metric(
- counter_data=CounterData(
+ SUM_INT64_TYPE: lambda a, b: metrics_pb2.Metric(
+ counter_data=metrics_pb2.CounterData(
int64_value=
a.counter_data.int64_value + b.counter_data.int64_value)),
- # TODO: Distributions, etc.
+ DISTRIBUTION_INT64_TYPE: distribution_combiner,
}
@@ -324,7 +350,7 @@ def consolidate(metrics, key=to_key):
if combiner:
def merge(a, b):
# pylint: disable=cell-var-from-loop
- return MonitoringInfo(
+ return metrics_pb2.MonitoringInfo(
urn=a.urn,
type=a.type,
labels=dict((label, value) for label, value in a.labels.items()
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8f0fa10..f56353b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -860,15 +860,17 @@ class AppliedPTransform(object):
return None
else:
return transform.to_runner_api(context, has_parts=bool(self.parts))
+ # Iterate over inputs and outputs by sorted key order, so that ids are
+ # consistently generated for multiple runs of the same pipeline.
return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
spec=transform_to_runner_api(self.transform, context),
subtransforms=[context.transforms.get_id(part, label=part.full_label)
for part in self.parts],
inputs={tag: context.pcollections.get_id(pc)
- for tag, pc in self.named_inputs().items()},
+ for tag, pc in sorted(self.named_inputs().items())},
outputs={str(tag): context.pcollections.get_id(out)
- for tag, out in self.named_outputs().items()},
+ for tag, out in sorted(self.named_outputs().items())},
# TODO(BEAM-115): display_data
display_data=None)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 5049ff4..11950c9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -31,6 +31,9 @@ import unittest
import uuid
from builtins import range
+import hamcrest
+from hamcrest.core.matcher import Matcher
+from hamcrest.core.string_description import StringDescription
from tenacity import retry
from tenacity import stop_after_attempt
@@ -56,6 +59,23 @@ else:
DEFAULT_SAMPLING_PERIOD_MS = 0
+def _matcher_or_equal_to(value_or_matcher):
+ """Pass-thru for matchers, and wraps value inputs in an equal_to matcher."""
+ if value_or_matcher is None:
+ return None
+ if isinstance(value_or_matcher, Matcher):
+ return value_or_matcher
+ return hamcrest.equal_to(value_or_matcher)
+
+
+def has_urn_and_labels(mi, urn, labels):
+ """Returns true if it the monitoring_info contains the labels and urn."""
+ def contains_labels(mi, labels):
+ # Check all the labels and their values exist in the monitoring_info
+ return all(item in mi.labels.items() for item in labels.items())
+ return contains_labels(mi, labels) and mi.urn == urn
+
+
class FnApiRunnerTest(unittest.TestCase):
def create_pipeline(self):
@@ -610,6 +630,121 @@ class FnApiRunnerTest(unittest.TestCase):
self.assertEqual(dist.committed.mean, 2.0)
self.assertEqual(gaug.committed.value, 3)
+ def test_callbacks_with_exception(self):
+ elements_list = ['1', '2']
+
+ def raise_expetion():
+ raise Exception('raise exception when calling callback')
+
+ class FinalizebleDoFnWithException(beam.DoFn):
+
+ def process(
+ self,
+ element,
+ bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+ bundle_finalizer.register(raise_expetion)
+ yield element
+
+ with self.create_pipeline() as p:
+ res = (p
+ | beam.Create(elements_list)
+ | beam.ParDo(FinalizebleDoFnWithException()))
+ assert_that(res, equal_to(['1', '2']))
+
+ def test_register_finalizations(self):
+ event_recorder = EventRecorder(tempfile.gettempdir())
+ elements_list = ['2', '1']
+
+ class FinalizableDoFn(beam.DoFn):
+ def process(
+ self,
+ element,
+ bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+ bundle_finalizer.register(lambda: event_recorder.record(element))
+ yield element
+
+ with self.create_pipeline() as p:
+ res = (p
+ | beam.Create(elements_list)
+ | beam.ParDo(FinalizableDoFn()))
+
+ assert_that(res, equal_to(elements_list))
+
+ results = event_recorder.events()
+ event_recorder.cleanup()
+ self.assertEqual(results, sorted(elements_list))
+
+
+# These tests are kept in a separate group so that they are
+# not ran in he FnApiRunnerTestWithBundleRepeat which repeats
+# bundle processing. This breaks the byte sampling metrics as
+# it makes the probability of sampling far too small
+# upon repeating bundle processing due to unncessarily incrementing
+# the sampling counter.
+class FnApiRunnerMetricsTest(unittest.TestCase):
+
+ def assert_has_counter(
+ self, monitoring_infos, urn, labels, value=None, ge_value=None):
+ # TODO(ajamato): Consider adding a matcher framework
+ found = 0
+ for mi in monitoring_infos:
+ if has_urn_and_labels(mi, urn, labels):
+ if ge_value is not None:
+ if mi.metric.counter_data.int64_value >= ge_value:
+ found = found + 1
+ elif value is not None:
+ if mi.metric.counter_data.int64_value == value:
+ found = found + 1
+ else:
+ found = found + 1
+ ge_value_str = {'ge_value' : ge_value} if ge_value else ''
+ value_str = {'value' : value} if value else ''
+ self.assertEqual(
+ 1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
+ (found, (urn, labels, value_str, ge_value_str),))
+
+ def assert_has_distribution(
+ self, monitoring_infos, urn, labels,
+ sum=None, count=None, min=None, max=None):
+ # TODO(ajamato): Consider adding a matcher framework
+ sum = _matcher_or_equal_to(sum)
+ count = _matcher_or_equal_to(count)
+ min = _matcher_or_equal_to(min)
+ max = _matcher_or_equal_to(max)
+ found = 0
+ description = StringDescription()
+ for mi in monitoring_infos:
+ if has_urn_and_labels(mi, urn, labels):
+ int_dist = mi.metric.distribution_data.int_distribution_data
+ increment = 1
+ if sum is not None:
+ description.append_text(' sum: ')
+ sum.describe_to(description)
+ if not sum.matches(int_dist.sum):
+ increment = 0
+ if count is not None:
+ description.append_text(' count: ')
+ count.describe_to(description)
+ if not count.matches(int_dist.count):
+ increment = 0
+ if min is not None:
+ description.append_text(' min: ')
+ min.describe_to(description)
+ if not min.matches(int_dist.min):
+ increment = 0
+ if max is not None:
+ description.append_text(' max: ')
+ max.describe_to(description)
+ if not max.matches(int_dist.max):
+ increment = 0
+ found += increment
+ self.assertEqual(
+ 1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
+ (found, (urn, labels, str(description)),))
+
+ def create_pipeline(self):
+ return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
+
def test_element_count_metrics(self):
class GenerateTwoOutputs(beam.DoFn):
def process(self, element):
@@ -618,9 +753,8 @@ class FnApiRunnerTest(unittest.TestCase):
yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
yield beam.pvalue.TaggedOutput('ThirdOutput', str(element) + '3')
- class PrintElements(beam.DoFn):
+ class PassThrough(beam.DoFn):
def process(self, element):
- logging.debug(element)
yield element
p = self.create_pipeline()
@@ -629,7 +763,9 @@ class FnApiRunnerTest(unittest.TestCase):
# internal way of accessing progress metrics.
self.skipTest('Metrics not supported.')
- pcoll = p | beam.Create(['a1', 'a2'])
+ # Produce enough elements to make sure byte sampling occurs.
+ num_source_elems = 100
+ pcoll = p | beam.Create(['a%d' % i for i in range(num_source_elems)])
# pylint: disable=expression-not-assigned
pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
@@ -642,43 +778,121 @@ class FnApiRunnerTest(unittest.TestCase):
# consume some of elements
merged = ((first_output, second_output, third_output) | beam.Flatten())
- merged | ('PrintingStep') >> beam.ParDo(PrintElements())
- second_output | ('PrintingStep2') >> beam.ParDo(PrintElements())
+ merged | ('PassThrough') >> beam.ParDo(PassThrough())
+ second_output | ('PassThrough2') >> beam.ParDo(PassThrough())
res = p.run()
res.wait_until_finish()
result_metrics = res.monitoring_metrics()
- def assert_contains_metric(src, urn, pcollection, value):
- for item in src:
- if item.urn == urn:
- if item.labels['PCOLLECTION'] == pcollection:
- self.assertEqual(item.metric.counter_data.int64_value, value,
- str(("Metric has incorrect value", value, item)))
- return
- self.fail(str(("Metric not found", urn, pcollection, src)))
-
counters = result_metrics.monitoring_infos()
+ # All element count and byte count metrics must have a PCOLLECTION_LABEL.
self.assertFalse([x for x in counters if
- x.urn == monitoring_infos.ELEMENT_COUNT_URN
+ x.urn in [monitoring_infos.ELEMENT_COUNT_URN,
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN]
and
monitoring_infos.PCOLLECTION_LABEL not in x.labels])
-
- assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
- 'Impulse', 1)
- assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
- 'ref_PCollection_PCollection_1', 2)
-
- # Skipping other pcollections due to non-deterministic naming for multiple
- # outputs.
-
- assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
- 'ref_PCollection_PCollection_5', 8)
- assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
- 'ref_PCollection_PCollection_6', 8)
- assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
- 'ref_PCollection_PCollection_7', 2)
+ try:
+ labels = {monitoring_infos.PCOLLECTION_LABEL : 'Impulse'}
+ self.assert_has_counter(
+ counters, monitoring_infos.ELEMENT_COUNT_URN, labels, 1)
+
+ # Create/Read, "out" output.
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_1'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # GenerateTwoOutputs, main output.
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_2'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # GenerateTwoOutputs, "SecondOutput" output.
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_3'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, 2 * num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # GenerateTwoOutputs, "ThirdOutput" output.
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_4'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # Skipping other pcollections due to non-deterministic naming for multiple
+ # outputs.
+ # Flatten/Read, main output.
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_5'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # PassThrough, main output
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_6'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+
+ # PassThrough2, main output
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_7'}
+ self.assert_has_counter(
+ counters,
+ monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+ self.assert_has_distribution(
+ counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+ min=hamcrest.greater_than(0),
+ max=hamcrest.greater_than(0),
+ sum=hamcrest.greater_than(0),
+ count=hamcrest.greater_than(0))
+ except:
+ print(res._monitoring_infos_by_stage)
+ raise
def test_non_user_metrics(self):
p = self.create_pipeline()
@@ -746,9 +960,9 @@ class FnApiRunnerTest(unittest.TestCase):
res = p.run()
res.wait_until_finish()
- def has_mi_for_ptransform(monitoring_infos, ptransform):
- for mi in monitoring_infos:
- if ptransform in mi.labels['PTRANSFORM']:
+ def has_mi_for_ptransform(mon_infos, ptransform):
+ for mi in mon_infos:
+ if ptransform in mi.labels[monitoring_infos.PTRANSFORM_LABEL]:
return True
return False
@@ -799,96 +1013,44 @@ class FnApiRunnerTest(unittest.TestCase):
# The monitoring infos above are actually unordered. Swap.
pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
- def assert_has_monitoring_info(
- monitoring_infos, urn, labels, value=None, ge_value=None):
- def contains_labels(monitoring_info, labels):
- return len([x for x in labels.items() if
- x[0] in monitoring_info.labels and monitoring_info.labels[
- x[0]] == x[1]]) == len(labels)
-
- # TODO(ajamato): Consider adding a matcher framework
- found = 0
- for mi in monitoring_infos:
- if contains_labels(mi, labels) and mi.urn == urn:
- if (ge_value is not None and
- mi.metric.counter_data.int64_value >= ge_value):
- found = found + 1
- elif (value is not None and
- mi.metric.counter_data.int64_value == value):
- found = found + 1
- ge_value_str = {'ge_value' : ge_value} if ge_value else ''
- value_str = {'value' : value} if value else ''
- self.assertEqual(
- 1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
- (found, (urn, labels, value_str, ge_value_str),))
-
# pregbk monitoring infos
- labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_1'}
- assert_has_monitoring_info(
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_1'}
+ self.assert_has_counter(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
- labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_2'}
- assert_has_monitoring_info(
+ self.assert_has_distribution(
+ pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_2'}
+ self.assert_has_counter(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
- labels = {'PTRANSFORM' : 'Map(sleep)'}
- assert_has_monitoring_info(
+ self.assert_has_distribution(
+ pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+ labels = {monitoring_infos.PTRANSFORM_LABEL : 'Map(sleep)'}
+ self.assert_has_counter(
pregbk_mis, monitoring_infos.TOTAL_MSECS_URN,
labels, ge_value=4 * DEFAULT_SAMPLING_PERIOD_MS)
# postgbk monitoring infos
- labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_6'}
- assert_has_monitoring_info(
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_6'}
+ self.assert_has_counter(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
- labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_7'}
- assert_has_monitoring_info(
+ self.assert_has_distribution(
+ postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+ labels = {monitoring_infos.PCOLLECTION_LABEL :
+ 'ref_PCollection_PCollection_7'}
+ self.assert_has_counter(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
+ self.assert_has_distribution(
+ postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
except:
print(res._monitoring_infos_by_stage)
raise
- def test_callbacks_with_exception(self):
- elements_list = ['1', '2']
-
- def raise_expetion():
- raise Exception('raise exception when calling callback')
-
- class FinalizebleDoFnWithException(beam.DoFn):
-
- def process(
- self,
- element,
- bundle_finalizer=beam.DoFn.BundleFinalizerParam):
- bundle_finalizer.register(raise_expetion)
- yield element
-
- with self.create_pipeline() as p:
- res = (p
- | beam.Create(elements_list)
- | beam.ParDo(FinalizebleDoFnWithException()))
- assert_that(res, equal_to(['1', '2']))
-
- def test_register_finalizations(self):
- event_recorder = EventRecorder(tempfile.gettempdir())
- elements_list = ['2', '1']
-
- class FinalizableDoFn(beam.DoFn):
- def process(
- self,
- element,
- bundle_finalizer=beam.DoFn.BundleFinalizerParam):
- bundle_finalizer.register(lambda: event_recorder.record(element))
- yield element
-
- with self.create_pipeline() as p:
- res = (p
- | beam.Create(elements_list)
- | beam.ParDo(FinalizableDoFn()))
-
- assert_that(res, equal_to(elements_list))
-
- results = event_recorder.events()
- event_recorder.cleanup()
- self.assertEqual(results, sorted(elements_list))
-
class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1437508..24ac478 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -66,6 +66,8 @@ IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1'
# TODO(vikasrk): Fix this once runner sends appropriate common_urns.
OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'urn:beam:dofn:javasdk:0.1'
OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1'
+URNS_NEEDING_PCOLLECTIONS = set([monitoring_infos.ELEMENT_COUNT_URN,
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN])
class RunnerIOOperation(operations.Operation):
@@ -698,14 +700,14 @@ class BundleProcessor(object):
infos_list = list(all_monitoring_infos_dict.values())
- def inject_pcollection_into_element_count(monitoring_info):
+ def inject_pcollection(monitoring_info):
"""
If provided metric is element count metric:
Finds relevant transform output info in current process_bundle_descriptor
and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
info.
"""
- if monitoring_info.urn == monitoring_infos.ELEMENT_COUNT_URN:
+ if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
return
ptransform_label = monitoring_info.labels[
@@ -722,6 +724,7 @@ class BundleProcessor(object):
pcollection_name = (self.process_bundle_descriptor
.transforms[ptransform_label].outputs[tag_label])
+
monitoring_info.labels[
monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
@@ -730,7 +733,7 @@ class BundleProcessor(object):
monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
for mi in infos_list:
- inject_pcollection_into_element_count(mi)
+ inject_pcollection(mi)
return infos_list
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 33f028f..ae36a6b 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -182,7 +182,8 @@ class OperationCounters(object):
self.element_counter = counter_factory.get_counter(
'%s-out%s-ElementCount' % (step_name, output_index), Counter.SUM)
self.mean_byte_counter = counter_factory.get_counter(
- '%s-out%s-MeanByteCount' % (step_name, output_index), Counter.MEAN)
+ '%s-out%s-MeanByteCount' % (step_name, output_index),
+ Counter.BEAM_DISTRIBUTION)
self.coder_impl = coder.get_impl() if coder else None
self.active_accumulator = None
self.current_size = None
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index 511b9b2..e850f6d 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -98,9 +98,9 @@ class OperationCountersTest(unittest.TestCase):
self.assertEqual(expected_elements, opcounts.element_counter.value())
if expected_size is not None:
if math.isnan(expected_size):
- self.assertTrue(math.isnan(opcounts.mean_byte_counter.value()))
+ self.assertTrue(math.isnan(opcounts.mean_byte_counter.value()[0]))
else:
- self.assertEqual(expected_size, opcounts.mean_byte_counter.value())
+ self.assertEqual(expected_size, opcounts.mean_byte_counter.value()[0])
def test_update_int(self):
opcounts = OperationCounters(CounterFactory(), 'some-name',
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index a947043..5743dec 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -71,7 +71,7 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
- cpdef element_count_monitoring_infos(self, transform_id)
+ cpdef pcollection_count_monitoring_infos(self, transform_id)
cpdef monitoring_infos(self, transform_id)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index de47cdd..1484c6e 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -36,6 +36,7 @@ from apache_beam.io import iobase
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import metrics_pb2
from apache_beam.runners import common
from apache_beam.runners.common import Receiver
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -272,23 +273,45 @@ class Operation(object):
"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
- self.element_count_monitoring_infos(transform_id))
+ self.pcollection_count_monitoring_infos(transform_id))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos
- def element_count_monitoring_infos(self, transform_id):
+ def pcollection_count_monitoring_infos(self, transform_id):
"""Returns the element count MonitoringInfo collected by this operation."""
if len(self.receivers) == 1:
# If there is exactly one output, we can unambiguously
# fix its name later, which we do.
# TODO(robertwb): Plumb the actual name here.
- mi = monitoring_infos.int64_counter(
+ elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
self.receivers[0].opcounter.element_counter.value(),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
)
- return {monitoring_infos.to_key(mi) : mi}
+
+ (unused_mean, sum, count, min, max) = (
+ self.receivers[0].opcounter.mean_byte_counter.value())
+ metric = metrics_pb2.Metric(
+ distribution_data=metrics_pb2.DistributionData(
+ int_distribution_data=metrics_pb2.IntDistributionData(
+ count=count,
+ sum=sum,
+ min=min,
+ max=max
+ )
+ )
+ )
+ sampled_byte_count = monitoring_infos.int64_distribution(
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+ metric,
+ ptransform=transform_id,
+ tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
+ )
+ return {
+ monitoring_infos.to_key(elem_count_mi) : elem_count_mi,
+ monitoring_infos.to_key(sampled_byte_count) : sampled_byte_count
+ }
return {}
def user_monitoring_infos(self, transform_id):
@@ -609,6 +632,25 @@ class DoOperation(Operation):
tag=str(tag)
)
infos[monitoring_infos.to_key(mi)] = mi
+ (unused_mean, sum, count, min, max) = (
+ receiver.opcounter.mean_byte_counter.value())
+ metric = metrics_pb2.Metric(
+ distribution_data=metrics_pb2.DistributionData(
+ int_distribution_data=metrics_pb2.IntDistributionData(
+ count=count,
+ sum=sum,
+ min=min,
+ max=max
+ )
+ )
+ )
+ sampled_byte_count = monitoring_infos.int64_distribution(
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+ metric,
+ ptransform=transform_id,
+ tag=str(tag)
+ )
+ infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
return infos
@@ -666,7 +708,8 @@ class SdfProcessSizedElements(DoOperation):
for receiver in self.tagged_receivers.values():
elements = receiver.opcounter.element_counter.value()
if elements > 0:
- total += elements * receiver.opcounter.mean_byte_counter.value()
+ mean = (receiver.opcounter.mean_byte_counter.value())[0]
+ total += elements * mean
return total