You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 22:30:51 UTC

[beam] branch master updated: [BEAM-4374] Emit SampledByteCount distribution tuple system metric from Python SDK (@Ardagan co-contributed)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f61dd  [BEAM-4374] Emit SampledByteCount distribution tuple system metric from Python SDK (@Ardagan co-contributed)
     new d4afbab  Merge pull request #8062 from ajamato/mean_byte_count
21f61dd is described below

commit 21f61dd1ef1d934a474e0fd05343dc83a09d89b2
Author: Alex Amato <aj...@google.com>
AuthorDate: Wed Mar 13 17:56:17 2019 -0700

    [BEAM-4374] Emit SampledByteCount distribution tuple system metric from
    Python SDK (@Ardagan co-contributed)
---
 model/pipeline/src/main/proto/metrics.proto        |  13 +
 .../python/apache_beam/metrics/monitoring_infos.py |  50 ++-
 sdks/python/apache_beam/pipeline.py                |   6 +-
 .../runners/portability/fn_api_runner_test.py      | 382 +++++++++++++++------
 .../apache_beam/runners/worker/bundle_processor.py |   9 +-
 .../apache_beam/runners/worker/opcounters.py       |   3 +-
 .../apache_beam/runners/worker/opcounters_test.py  |   4 +-
 .../apache_beam/runners/worker/operations.pxd      |   2 +-
 .../apache_beam/runners/worker/operations.py       |  53 ++-
 9 files changed, 386 insertions(+), 136 deletions(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index 3b45efb..7d41d57 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -81,6 +81,19 @@ message MonitoringInfoSpecs {
       } ]
     }];
 
+    SAMPLED_BYTE_SIZE = 7 [(monitoring_info_spec) = {
+      urn: "beam:metric:sampled_byte_size:v1",
+      type_urn: "beam:metrics:distribution_int_64",
+      required_labels: [ "PCOLLECTION" ],
+      annotations: [ {
+        key: "description",
+        value: "The total byte size and count of a sampled "
+               " set (or all) of elements in the pcollection. Sampling is used "
+               " because calculating the byte count involves serializing the "
+               " elements which is CPU intensive."
+      } ]
+    }];
+
     START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
       urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
       type_urn: "beam:metrics:sum_int_64",
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 76f3b45..0e73461 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -31,10 +31,10 @@ from apache_beam.metrics.cells import DistributionResult
 from apache_beam.metrics.cells import GaugeData
 from apache_beam.metrics.cells import GaugeResult
 from apache_beam.portability import common_urns
-from apache_beam.portability.api.metrics_pb2 import CounterData
-from apache_beam.portability.api.metrics_pb2 import Metric
-from apache_beam.portability.api.metrics_pb2 import MonitoringInfo
+from apache_beam.portability.api import metrics_pb2
 
+SAMPLED_BYTE_SIZE_URN = (
+    common_urns.monitoring_info_specs.SAMPLED_BYTE_SIZE.spec.urn)
 ELEMENT_COUNT_URN = common_urns.monitoring_info_specs.ELEMENT_COUNT.spec.urn
 START_BUNDLE_MSECS_URN = (
     common_urns.monitoring_info_specs.START_BUNDLE_MSECS.spec.urn)
@@ -141,8 +141,8 @@ def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
   labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
                          name=name)
   if isinstance(metric, int):
-    metric = Metric(
-        counter_data=CounterData(
+    metric = metrics_pb2.Metric(
+        counter_data=metrics_pb2.CounterData(
             int64_value=metric
         )
     )
@@ -162,8 +162,8 @@ def int64_counter(urn, metric, ptransform=None, tag=None):
   """
   labels = create_labels(ptransform=ptransform, tag=tag)
   if isinstance(metric, int):
-    metric = Metric(
-        counter_data=CounterData(
+    metric = metrics_pb2.Metric(
+        counter_data=metrics_pb2.CounterData(
             int64_value=metric
         )
     )
@@ -186,6 +186,20 @@ def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
                                 DISTRIBUTION_INT64_TYPE, metric, labels)
 
 
+def int64_distribution(urn, metric, ptransform=None, tag=None):
+  """Return a distribution monitoring info for the URN, metric and labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    metric: The metric proto field to use in the monitoring info.
+    ptransform: The ptransform/step name used as a label.
+    tag: The output tag name, used as a label.
+  """
+  labels = create_labels(ptransform=ptransform, tag=tag)
+  return create_monitoring_info(
+      urn, DISTRIBUTION_INT64_TYPE, metric, labels)
+
+
 def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
   """Return the gauge monitoring info for the URN, metric and labels.
 
@@ -214,7 +228,7 @@ def create_monitoring_info(urn, type_urn, metric_proto, labels=None):
         Or an int value.
     labels: The label dictionary to use in the MonitoringInfo.
   """
-  return MonitoringInfo(
+  return metrics_pb2.MonitoringInfo(
       urn=urn,
       type=type_urn,
       labels=labels or dict(),
@@ -296,12 +310,24 @@ def to_key(monitoring_info_proto):
   return frozenset(key_items)
 
 
+def distribution_combiner(metric_a, metric_b):
+  a_data = metric_a.distribution_data.int_distribution_data
+  b_data = metric_b.distribution_data.int_distribution_data
+  return metrics_pb2.Metric(
+      distribution_data=metrics_pb2.DistributionData(
+          int_distribution_data=metrics_pb2.IntDistributionData(
+              count=a_data.count + b_data.count,
+              sum=a_data.sum + b_data.sum,
+              min=min(a_data.min, b_data.min),
+              max=max(a_data.max, b_data.max))))
+
+
 _KNOWN_COMBINERS = {
-    SUM_INT64_TYPE: lambda a, b: Metric(
-        counter_data=CounterData(
+    SUM_INT64_TYPE: lambda a, b: metrics_pb2.Metric(
+        counter_data=metrics_pb2.CounterData(
             int64_value=
             a.counter_data.int64_value + b.counter_data.int64_value)),
-    # TODO: Distributions, etc.
+    DISTRIBUTION_INT64_TYPE: distribution_combiner,
 }
 
 
@@ -324,7 +350,7 @@ def consolidate(metrics, key=to_key):
       if combiner:
         def merge(a, b):
           # pylint: disable=cell-var-from-loop
-          return MonitoringInfo(
+          return metrics_pb2.MonitoringInfo(
               urn=a.urn,
               type=a.type,
               labels=dict((label, value) for label, value in a.labels.items()
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8f0fa10..f56353b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -860,15 +860,17 @@ class AppliedPTransform(object):
         return None
       else:
         return transform.to_runner_api(context, has_parts=bool(self.parts))
+    # Iterate over inputs and outputs by sorted key order, so that ids are
+    # consistently generated for multiple runs of the same pipeline.
     return beam_runner_api_pb2.PTransform(
         unique_name=self.full_label,
         spec=transform_to_runner_api(self.transform, context),
         subtransforms=[context.transforms.get_id(part, label=part.full_label)
                        for part in self.parts],
         inputs={tag: context.pcollections.get_id(pc)
-                for tag, pc in self.named_inputs().items()},
+                for tag, pc in sorted(self.named_inputs().items())},
         outputs={str(tag): context.pcollections.get_id(out)
-                 for tag, out in self.named_outputs().items()},
+                 for tag, out in sorted(self.named_outputs().items())},
         # TODO(BEAM-115): display_data
         display_data=None)
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 5049ff4..11950c9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -31,6 +31,9 @@ import unittest
 import uuid
 from builtins import range
 
+import hamcrest
+from hamcrest.core.matcher import Matcher
+from hamcrest.core.string_description import StringDescription
 from tenacity import retry
 from tenacity import stop_after_attempt
 
@@ -56,6 +59,23 @@ else:
   DEFAULT_SAMPLING_PERIOD_MS = 0
 
 
+def _matcher_or_equal_to(value_or_matcher):
+  """Pass-thru for matchers, and wraps value inputs in an equal_to matcher."""
+  if value_or_matcher is None:
+    return None
+  if isinstance(value_or_matcher, Matcher):
+    return value_or_matcher
+  return hamcrest.equal_to(value_or_matcher)
+
+
+def has_urn_and_labels(mi, urn, labels):
+  """Returns true if it the monitoring_info contains the labels and urn."""
+  def contains_labels(mi, labels):
+    # Check all the labels and their values exist in the monitoring_info
+    return all(item in mi.labels.items() for item in labels.items())
+  return contains_labels(mi, labels) and mi.urn == urn
+
+
 class FnApiRunnerTest(unittest.TestCase):
 
   def create_pipeline(self):
@@ -610,6 +630,121 @@ class FnApiRunnerTest(unittest.TestCase):
     self.assertEqual(dist.committed.mean, 2.0)
     self.assertEqual(gaug.committed.value, 3)
 
+  def test_callbacks_with_exception(self):
+    elements_list = ['1', '2']
+
+    def raise_expetion():
+      raise Exception('raise exception when calling callback')
+
+    class FinalizebleDoFnWithException(beam.DoFn):
+
+      def process(
+          self,
+          element,
+          bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+        bundle_finalizer.register(raise_expetion)
+        yield element
+
+    with self.create_pipeline() as p:
+      res = (p
+             | beam.Create(elements_list)
+             | beam.ParDo(FinalizebleDoFnWithException()))
+      assert_that(res, equal_to(['1', '2']))
+
+  def test_register_finalizations(self):
+    event_recorder = EventRecorder(tempfile.gettempdir())
+    elements_list = ['2', '1']
+
+    class FinalizableDoFn(beam.DoFn):
+      def process(
+          self,
+          element,
+          bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+        bundle_finalizer.register(lambda: event_recorder.record(element))
+        yield element
+
+    with self.create_pipeline() as p:
+      res = (p
+             | beam.Create(elements_list)
+             | beam.ParDo(FinalizableDoFn()))
+
+      assert_that(res, equal_to(elements_list))
+
+    results = event_recorder.events()
+    event_recorder.cleanup()
+    self.assertEqual(results, sorted(elements_list))
+
+
+# These tests are kept in a separate group so that they are
+# not ran in he FnApiRunnerTestWithBundleRepeat which repeats
+# bundle processing. This breaks the byte sampling metrics as
+# it makes the probability of sampling far too small
+# upon repeating bundle processing due to unncessarily incrementing
+# the sampling counter.
+class FnApiRunnerMetricsTest(unittest.TestCase):
+
+  def assert_has_counter(
+      self, monitoring_infos, urn, labels, value=None, ge_value=None):
+    # TODO(ajamato): Consider adding a matcher framework
+    found = 0
+    for mi in monitoring_infos:
+      if has_urn_and_labels(mi, urn, labels):
+        if ge_value is not None:
+          if mi.metric.counter_data.int64_value >= ge_value:
+            found = found + 1
+        elif value is not None:
+          if mi.metric.counter_data.int64_value == value:
+            found = found + 1
+        else:
+          found = found + 1
+    ge_value_str = {'ge_value' : ge_value} if ge_value else ''
+    value_str = {'value' : value} if value else ''
+    self.assertEqual(
+        1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
+        (found, (urn, labels, value_str, ge_value_str),))
+
+  def assert_has_distribution(
+      self, monitoring_infos, urn, labels,
+      sum=None, count=None, min=None, max=None):
+    # TODO(ajamato): Consider adding a matcher framework
+    sum = _matcher_or_equal_to(sum)
+    count = _matcher_or_equal_to(count)
+    min = _matcher_or_equal_to(min)
+    max = _matcher_or_equal_to(max)
+    found = 0
+    description = StringDescription()
+    for mi in monitoring_infos:
+      if has_urn_and_labels(mi, urn, labels):
+        int_dist = mi.metric.distribution_data.int_distribution_data
+        increment = 1
+        if sum is not None:
+          description.append_text(' sum: ')
+          sum.describe_to(description)
+          if not sum.matches(int_dist.sum):
+            increment = 0
+        if count is not None:
+          description.append_text(' count: ')
+          count.describe_to(description)
+          if not count.matches(int_dist.count):
+            increment = 0
+        if min is not None:
+          description.append_text(' min: ')
+          min.describe_to(description)
+          if not min.matches(int_dist.min):
+            increment = 0
+        if max is not None:
+          description.append_text(' max: ')
+          max.describe_to(description)
+          if not max.matches(int_dist.max):
+            increment = 0
+        found += increment
+    self.assertEqual(
+        1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
+        (found, (urn, labels, str(description)),))
+
+  def create_pipeline(self):
+    return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
+
   def test_element_count_metrics(self):
     class GenerateTwoOutputs(beam.DoFn):
       def process(self, element):
@@ -618,9 +753,8 @@ class FnApiRunnerTest(unittest.TestCase):
         yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
         yield beam.pvalue.TaggedOutput('ThirdOutput', str(element) + '3')
 
-    class PrintElements(beam.DoFn):
+    class PassThrough(beam.DoFn):
       def process(self, element):
-        logging.debug(element)
         yield element
 
     p = self.create_pipeline()
@@ -629,7 +763,9 @@ class FnApiRunnerTest(unittest.TestCase):
       # internal way of accessing progress metrics.
       self.skipTest('Metrics not supported.')
 
-    pcoll = p | beam.Create(['a1', 'a2'])
+    # Produce enough elements to make sure byte sampling occurs.
+    num_source_elems = 100
+    pcoll = p | beam.Create(['a%d' % i for i in range(num_source_elems)])
 
     # pylint: disable=expression-not-assigned
     pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
@@ -642,43 +778,121 @@ class FnApiRunnerTest(unittest.TestCase):
 
     # consume some of elements
     merged = ((first_output, second_output, third_output) | beam.Flatten())
-    merged | ('PrintingStep') >> beam.ParDo(PrintElements())
-    second_output | ('PrintingStep2') >> beam.ParDo(PrintElements())
+    merged | ('PassThrough') >> beam.ParDo(PassThrough())
+    second_output | ('PassThrough2') >> beam.ParDo(PassThrough())
 
     res = p.run()
     res.wait_until_finish()
 
     result_metrics = res.monitoring_metrics()
 
-    def assert_contains_metric(src, urn, pcollection, value):
-      for item in src:
-        if item.urn == urn:
-          if item.labels['PCOLLECTION'] == pcollection:
-            self.assertEqual(item.metric.counter_data.int64_value, value,
-                             str(("Metric has incorrect value", value, item)))
-            return
-      self.fail(str(("Metric not found", urn, pcollection, src)))
-
     counters = result_metrics.monitoring_infos()
+    # All element count and byte count metrics must have a PCOLLECTION_LABEL.
     self.assertFalse([x for x in counters if
-                      x.urn == monitoring_infos.ELEMENT_COUNT_URN
+                      x.urn in [monitoring_infos.ELEMENT_COUNT_URN,
+                                monitoring_infos.SAMPLED_BYTE_SIZE_URN]
                       and
                       monitoring_infos.PCOLLECTION_LABEL not in x.labels])
-
-    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
-                           'Impulse', 1)
-    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
-                           'ref_PCollection_PCollection_1', 2)
-
-    # Skipping other pcollections due to non-deterministic naming for multiple
-    # outputs.
-
-    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
-                           'ref_PCollection_PCollection_5', 8)
-    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
-                           'ref_PCollection_PCollection_6', 8)
-    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
-                           'ref_PCollection_PCollection_7', 2)
+    try:
+      labels = {monitoring_infos.PCOLLECTION_LABEL : 'Impulse'}
+      self.assert_has_counter(
+          counters, monitoring_infos.ELEMENT_COUNT_URN, labels, 1)
+
+      # Create/Read, "out" output.
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_1'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # GenerateTwoOutputs, main output.
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_2'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # GenerateTwoOutputs, "SecondOutput" output.
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_3'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, 2 * num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # GenerateTwoOutputs, "ThirdOutput" output.
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_4'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # Skipping other pcollections due to non-deterministic naming for multiple
+      # outputs.
+      # Flatten/Read, main output.
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_5'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # PassThrough, main output
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_6'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+
+      # PassThrough2, main output
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                    'ref_PCollection_PCollection_7'}
+      self.assert_has_counter(
+          counters,
+          monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
+      self.assert_has_distribution(
+          counters, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels,
+          min=hamcrest.greater_than(0),
+          max=hamcrest.greater_than(0),
+          sum=hamcrest.greater_than(0),
+          count=hamcrest.greater_than(0))
+    except:
+      print(res._monitoring_infos_by_stage)
+      raise
 
   def test_non_user_metrics(self):
     p = self.create_pipeline()
@@ -746,9 +960,9 @@ class FnApiRunnerTest(unittest.TestCase):
     res = p.run()
     res.wait_until_finish()
 
-    def has_mi_for_ptransform(monitoring_infos, ptransform):
-      for mi in monitoring_infos:
-        if ptransform in mi.labels['PTRANSFORM']:
+    def has_mi_for_ptransform(mon_infos, ptransform):
+      for mi in mon_infos:
+        if ptransform in mi.labels[monitoring_infos.PTRANSFORM_LABEL]:
           return True
       return False
 
@@ -799,96 +1013,44 @@ class FnApiRunnerTest(unittest.TestCase):
         # The monitoring infos above are actually unordered. Swap.
         pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
 
-      def assert_has_monitoring_info(
-          monitoring_infos, urn, labels, value=None, ge_value=None):
-        def contains_labels(monitoring_info, labels):
-          return len([x for x in labels.items() if
-                      x[0] in monitoring_info.labels and monitoring_info.labels[
-                          x[0]] == x[1]]) == len(labels)
-
-        # TODO(ajamato): Consider adding a matcher framework
-        found = 0
-        for mi in monitoring_infos:
-          if contains_labels(mi, labels) and mi.urn == urn:
-            if (ge_value is not None and
-                mi.metric.counter_data.int64_value >= ge_value):
-              found = found + 1
-            elif (value is not None and
-                  mi.metric.counter_data.int64_value == value):
-              found = found + 1
-        ge_value_str = {'ge_value' : ge_value} if ge_value else ''
-        value_str = {'value' : value} if value else ''
-        self.assertEqual(
-            1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
-            (found, (urn, labels, value_str, ge_value_str),))
-
       # pregbk monitoring infos
-      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_1'}
-      assert_has_monitoring_info(
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                'ref_PCollection_PCollection_1'}
+      self.assert_has_counter(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
-      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_2'}
-      assert_has_monitoring_info(
+      self.assert_has_distribution(
+          pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                'ref_PCollection_PCollection_2'}
+      self.assert_has_counter(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
-      labels = {'PTRANSFORM' : 'Map(sleep)'}
-      assert_has_monitoring_info(
+      self.assert_has_distribution(
+          pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+      labels = {monitoring_infos.PTRANSFORM_LABEL : 'Map(sleep)'}
+      self.assert_has_counter(
           pregbk_mis, monitoring_infos.TOTAL_MSECS_URN,
           labels, ge_value=4 * DEFAULT_SAMPLING_PERIOD_MS)
 
       # postgbk monitoring infos
-      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_6'}
-      assert_has_monitoring_info(
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                'ref_PCollection_PCollection_6'}
+      self.assert_has_counter(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
-      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_7'}
-      assert_has_monitoring_info(
+      self.assert_has_distribution(
+          postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
+
+      labels = {monitoring_infos.PCOLLECTION_LABEL :
+                'ref_PCollection_PCollection_7'}
+      self.assert_has_counter(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
+      self.assert_has_distribution(
+          postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
     except:
       print(res._monitoring_infos_by_stage)
       raise
 
-  def test_callbacks_with_exception(self):
-    elements_list = ['1', '2']
-
-    def raise_expetion():
-      raise Exception('raise exception when calling callback')
-
-    class FinalizebleDoFnWithException(beam.DoFn):
-
-      def process(
-          self,
-          element,
-          bundle_finalizer=beam.DoFn.BundleFinalizerParam):
-        bundle_finalizer.register(raise_expetion)
-        yield element
-
-    with self.create_pipeline() as p:
-      res = (p
-             | beam.Create(elements_list)
-             | beam.ParDo(FinalizebleDoFnWithException()))
-      assert_that(res, equal_to(['1', '2']))
-
-  def test_register_finalizations(self):
-    event_recorder = EventRecorder(tempfile.gettempdir())
-    elements_list = ['2', '1']
-
-    class FinalizableDoFn(beam.DoFn):
-      def process(
-          self,
-          element,
-          bundle_finalizer=beam.DoFn.BundleFinalizerParam):
-        bundle_finalizer.register(lambda: event_recorder.record(element))
-        yield element
-
-    with self.create_pipeline() as p:
-      res = (p
-             | beam.Create(elements_list)
-             | beam.ParDo(FinalizableDoFn()))
-
-      assert_that(res, equal_to(elements_list))
-
-    results = event_recorder.events()
-    event_recorder.cleanup()
-    self.assertEqual(results, sorted(elements_list))
-
 
 class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1437508..24ac478 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -66,6 +66,8 @@ IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1'
 # TODO(vikasrk): Fix this once runner sends appropriate common_urns.
 OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'urn:beam:dofn:javasdk:0.1'
 OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1'
+URNS_NEEDING_PCOLLECTIONS = set([monitoring_infos.ELEMENT_COUNT_URN,
+                                 monitoring_infos.SAMPLED_BYTE_SIZE_URN])
 
 
 class RunnerIOOperation(operations.Operation):
@@ -698,14 +700,14 @@ class BundleProcessor(object):
 
     infos_list = list(all_monitoring_infos_dict.values())
 
-    def inject_pcollection_into_element_count(monitoring_info):
+    def inject_pcollection(monitoring_info):
       """
       If provided metric is element count metric:
       Finds relevant transform output info in current process_bundle_descriptor
       and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
       info.
       """
-      if monitoring_info.urn == monitoring_infos.ELEMENT_COUNT_URN:
+      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
         if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
           return
         ptransform_label = monitoring_info.labels[
@@ -722,6 +724,7 @@ class BundleProcessor(object):
 
         pcollection_name = (self.process_bundle_descriptor
                             .transforms[ptransform_label].outputs[tag_label])
+
         monitoring_info.labels[
             monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
 
@@ -730,7 +733,7 @@ class BundleProcessor(object):
         monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
 
     for mi in infos_list:
-      inject_pcollection_into_element_count(mi)
+      inject_pcollection(mi)
 
     return infos_list
 
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 33f028f..ae36a6b 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -182,7 +182,8 @@ class OperationCounters(object):
     self.element_counter = counter_factory.get_counter(
         '%s-out%s-ElementCount' % (step_name, output_index), Counter.SUM)
     self.mean_byte_counter = counter_factory.get_counter(
-        '%s-out%s-MeanByteCount' % (step_name, output_index), Counter.MEAN)
+        '%s-out%s-MeanByteCount' % (step_name, output_index),
+        Counter.BEAM_DISTRIBUTION)
     self.coder_impl = coder.get_impl() if coder else None
     self.active_accumulator = None
     self.current_size = None
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index 511b9b2..e850f6d 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -98,9 +98,9 @@ class OperationCountersTest(unittest.TestCase):
     self.assertEqual(expected_elements, opcounts.element_counter.value())
     if expected_size is not None:
       if math.isnan(expected_size):
-        self.assertTrue(math.isnan(opcounts.mean_byte_counter.value()))
+        self.assertTrue(math.isnan(opcounts.mean_byte_counter.value()[0]))
       else:
-        self.assertEqual(expected_size, opcounts.mean_byte_counter.value())
+        self.assertEqual(expected_size, opcounts.mean_byte_counter.value()[0])
 
   def test_update_int(self):
     opcounts = OperationCounters(CounterFactory(), 'some-name',
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index a947043..5743dec 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -71,7 +71,7 @@ cdef class Operation(object):
   cpdef output(self, WindowedValue windowed_value, int output_index=*)
   cpdef execution_time_monitoring_infos(self, transform_id)
   cpdef user_monitoring_infos(self, transform_id)
-  cpdef element_count_monitoring_infos(self, transform_id)
+  cpdef pcollection_count_monitoring_infos(self, transform_id)
   cpdef monitoring_infos(self, transform_id)
 
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index de47cdd..1484c6e 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -36,6 +36,7 @@ from apache_beam.io import iobase
 from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -272,23 +273,45 @@ class Operation(object):
     """Returns the list of MonitoringInfos collected by this operation."""
     all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
     all_monitoring_infos.update(
-        self.element_count_monitoring_infos(transform_id))
+        self.pcollection_count_monitoring_infos(transform_id))
     all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
     return all_monitoring_infos
 
-  def element_count_monitoring_infos(self, transform_id):
+  def pcollection_count_monitoring_infos(self, transform_id):
     """Returns the element count MonitoringInfo collected by this operation."""
     if len(self.receivers) == 1:
       # If there is exactly one output, we can unambiguously
       # fix its name later, which we do.
       # TODO(robertwb): Plumb the actual name here.
-      mi = monitoring_infos.int64_counter(
+      elem_count_mi = monitoring_infos.int64_counter(
           monitoring_infos.ELEMENT_COUNT_URN,
           self.receivers[0].opcounter.element_counter.value(),
           ptransform=transform_id,
           tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
       )
-      return {monitoring_infos.to_key(mi) : mi}
+
+      (unused_mean, sum, count, min, max) = (
+          self.receivers[0].opcounter.mean_byte_counter.value())
+      metric = metrics_pb2.Metric(
+          distribution_data=metrics_pb2.DistributionData(
+              int_distribution_data=metrics_pb2.IntDistributionData(
+                  count=count,
+                  sum=sum,
+                  min=min,
+                  max=max
+              )
+          )
+      )
+      sampled_byte_count = monitoring_infos.int64_distribution(
+          monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+          metric,
+          ptransform=transform_id,
+          tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
+      )
+      return {
+          monitoring_infos.to_key(elem_count_mi) : elem_count_mi,
+          monitoring_infos.to_key(sampled_byte_count) : sampled_byte_count
+      }
     return {}
 
   def user_monitoring_infos(self, transform_id):
@@ -609,6 +632,25 @@ class DoOperation(Operation):
             tag=str(tag)
         )
         infos[monitoring_infos.to_key(mi)] = mi
+        (unused_mean, sum, count, min, max) = (
+            receiver.opcounter.mean_byte_counter.value())
+        metric = metrics_pb2.Metric(
+            distribution_data=metrics_pb2.DistributionData(
+                int_distribution_data=metrics_pb2.IntDistributionData(
+                    count=count,
+                    sum=sum,
+                    min=min,
+                    max=max
+                )
+            )
+        )
+        sampled_byte_count = monitoring_infos.int64_distribution(
+            monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+            metric,
+            ptransform=transform_id,
+            tag=str(tag)
+        )
+        infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
     return infos
 
 
@@ -666,7 +708,8 @@ class SdfProcessSizedElements(DoOperation):
     for receiver in self.tagged_receivers.values():
       elements = receiver.opcounter.element_counter.value()
       if elements > 0:
-        total += elements * receiver.opcounter.mean_byte_counter.value()
+        mean = (receiver.opcounter.mean_byte_counter.value())[0]
+        total += elements * mean
     return total