You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 22:47:05 UTC

[beam] branch master updated: Inject PCollection id into element_count metric.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c2cd3  Inject PCollection id into element_count metric.
     new 05bc3f6  Merge pull request #7942 from Ardagan/RowCount0.1
35c2cd3 is described below

commit 35c2cd3bfcc7c3ee2fa9ae6fcb3e373bcd4e50ef
Author: Mikhail Gryzykhin <mi...@google.com>
AuthorDate: Fri Feb 22 13:54:51 2019 -0800

    Inject PCollection id into element_count metric.
---
 .../python/apache_beam/metrics/monitoring_infos.py |   9 +-
 .../runners/portability/fn_api_runner.py           |   5 +
 .../runners/portability/fn_api_runner_test.py      | 101 ++++++++++++++++++---
 .../apache_beam/runners/worker/bundle_processor.py |  40 +++++++-
 4 files changed, 137 insertions(+), 18 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 2e9059f..d2fa09d 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -55,6 +55,11 @@ COUNTER_TYPES = set([SUM_INT64_TYPE])
 DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
 GAUGE_TYPES = set([LATEST_INT64_TYPE])
 
+# TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels
+PCOLLECTION_LABEL = 'PCOLLECTION'
+PTRANSFORM_LABEL = 'PTRANSFORM'
+TAG_LABEL = 'TAG'
+
 
 def to_timestamp_proto(timestamp_secs):
   """Converts seconds since epoch to a google.protobuf.Timestamp.
@@ -103,9 +108,9 @@ def create_labels(ptransform='', tag=''):
   """
   labels = {}
   if tag:
-    labels['TAG'] = tag
+    labels[TAG_LABEL] = tag
   if ptransform:
-    labels['PTRANSFORM'] = ptransform
+    labels[PTRANSFORM_LABEL] = ptransform
   return labels
 
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e4cd6a4..a6eda80 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -1375,6 +1375,7 @@ class FnApiMetrics(metrics.metric.MetricResults):
     self._gauges = {}
     self._user_metrics_only = user_metrics_only
     self._init_metrics_from_monitoring_infos(step_monitoring_infos)
+    self._monitoring_infos = step_monitoring_infos
 
   def _init_metrics_from_monitoring_infos(self, step_monitoring_infos):
     for smi in step_monitoring_infos.values():
@@ -1415,6 +1416,10 @@ class FnApiMetrics(metrics.metric.MetricResults):
             self.DISTRIBUTIONS: distributions,
             self.GAUGES: gauges}
 
+  def monitoring_infos(self):
+    return [item for sublist in self._monitoring_infos.values() for item in
+            sublist]
+
 
 class RunnerResult(runner.PipelineResult):
   def __init__(self, state, monitoring_infos_by_stage, metrics_by_stage):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e03240d..c023774 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -562,6 +562,76 @@ class FnApiRunnerTest(unittest.TestCase):
     self.assertEqual(dist.committed.mean, 2.0)
     self.assertEqual(gaug.committed.value, 3)
 
+  def test_element_count_metrics(self):
+    class GenerateTwoOutputs(beam.DoFn):
+      def process(self, element):
+        yield str(element) + '1'
+        yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
+        yield beam.pvalue.TaggedOutput('SecondOutput', str(element) + '2')
+        yield beam.pvalue.TaggedOutput('ThirdOutput', str(element) + '3')
+
+    class PrintElements(beam.DoFn):
+      def process(self, element):
+        logging.debug(element)
+        yield element
+
+    p = self.create_pipeline()
+    if not isinstance(p.runner, fn_api_runner.FnApiRunner):
+      # This test is inherited by others that may not support the same
+      # internal way of accessing progress metrics.
+      self.skipTest('Metrics not supported.')
+
+    pcoll = p | beam.Create(['a1', 'a2'])
+
+    # pylint: disable=expression-not-assigned
+    pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
+        GenerateTwoOutputs()).with_outputs('SecondOutput',
+                                           'ThirdOutput',
+                                           main='FirstAndMainOutput'))
+
+    # Actually feed pcollection to pardo
+    second_output, third_output, first_output = (pcoll | pardo)
+
+    # consume some of elements
+    merged = ((first_output, second_output, third_output) | beam.Flatten())
+    merged | ('PrintingStep') >> beam.ParDo(PrintElements())
+    second_output | ('PrintingStep2') >> beam.ParDo(PrintElements())
+
+    res = p.run()
+    res.wait_until_finish()
+
+    result_metrics = res.monitoring_metrics()
+
+    def assert_contains_metric(src, urn, pcollection, value):
+      for item in src:
+        if item.urn == urn:
+          if item.labels['PCOLLECTION'] == pcollection:
+            self.assertEqual(item.metric.counter_data.int64_value, value,
+                             str(("Metric has incorrect value", value, item)))
+            return
+      self.fail(str(("Metric not found", urn, pcollection, src)))
+
+    counters = result_metrics.monitoring_infos()
+    self.assertFalse([x for x in counters if
+                      x.urn == monitoring_infos.ELEMENT_COUNT_URN
+                      and
+                      monitoring_infos.PCOLLECTION_LABEL not in x.labels])
+
+    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+                           'Impulse', 1)
+    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+                           'ref_PCollection_PCollection_1', 2)
+
+    # Skipping other pcollections due to non-deterministic naming for multiple
+    # outputs.
+
+    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+                           'ref_PCollection_PCollection_5', 8)
+    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+                           'ref_PCollection_PCollection_6', 8)
+    assert_contains_metric(counters, monitoring_infos.ELEMENT_COUNT_URN,
+                           'ref_PCollection_PCollection_7', 2)
+
   def test_non_user_metrics(self):
     p = self.create_pipeline()
     if not isinstance(p.runner, fn_api_runner.FnApiRunner):
@@ -587,7 +657,6 @@ class FnApiRunnerTest(unittest.TestCase):
       self.assertEqual(
           1, found, "Did not find exactly 1 metric for %s." % metric_key)
     urns = [
-        monitoring_infos.ELEMENT_COUNT_URN,
         monitoring_infos.START_BUNDLE_MSECS_URN,
         monitoring_infos.PROCESS_BUNDLE_MSECS_URN,
         monitoring_infos.FINISH_BUNDLE_MSECS_URN,
@@ -613,6 +682,7 @@ class FnApiRunnerTest(unittest.TestCase):
       # This test is inherited by others that may not support the same
       # internal way of accessing progress metrics.
       self.skipTest('Progress metrics not supported.')
+      return
 
     _ = (p
          | beam.Create([0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS])
@@ -624,6 +694,7 @@ class FnApiRunnerTest(unittest.TestCase):
              beam.pvalue.TaggedOutput('once', x),
              beam.pvalue.TaggedOutput('twice', x),
              beam.pvalue.TaggedOutput('twice', x)]))
+
     res = p.run()
     res.wait_until_finish()
 
@@ -675,21 +746,27 @@ class FnApiRunnerTest(unittest.TestCase):
       # Test the new MonitoringInfo monitoring format.
       self.assertEqual(2, len(res._monitoring_infos_by_stage))
       pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
+
       if not has_mi_for_ptransform(pregbk_mis, 'Create/Read'):
         # The monitoring infos above are actually unordered. Swap.
         pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
 
       def assert_has_monitoring_info(
           monitoring_infos, urn, labels, value=None, ge_value=None):
+        def contains_labels(monitoring_info, labels):
+          return len([x for x in labels.items() if
+                      x[0] in monitoring_info.labels and monitoring_info.labels[
+                          x[0]] == x[1]]) == len(labels)
+
         # TODO(ajamato): Consider adding a matcher framework
         found = 0
-        for m in monitoring_infos:
-          if m.labels == labels and m.urn == urn:
+        for mi in monitoring_infos:
+          if contains_labels(mi, labels) and mi.urn == urn:
             if (ge_value is not None and
-                m.metric.counter_data.int64_value >= ge_value):
+                mi.metric.counter_data.int64_value >= ge_value):
               found = found + 1
             elif (value is not None and
-                  m.metric.counter_data.int64_value == value):
+                  mi.metric.counter_data.int64_value == value):
               found = found + 1
         ge_value_str = {'ge_value' : ge_value} if ge_value else ''
         value_str = {'value' : value} if value else ''
@@ -698,10 +775,10 @@ class FnApiRunnerTest(unittest.TestCase):
             (found, (urn, labels, value_str, ge_value_str),))
 
       # pregbk monitoring infos
-      labels = {'PTRANSFORM' : 'Create/Read', 'TAG' : 'out'}
+      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_1'}
       assert_has_monitoring_info(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
-      labels = {'PTRANSFORM' : 'Map(sleep)', 'TAG' : 'None'}
+      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_2'}
       assert_has_monitoring_info(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
       labels = {'PTRANSFORM' : 'Map(sleep)'}
@@ -710,18 +787,12 @@ class FnApiRunnerTest(unittest.TestCase):
           labels, ge_value=4 * DEFAULT_SAMPLING_PERIOD_MS)
 
       # postgbk monitoring infos
-      labels = {'PTRANSFORM' : 'GroupByKey/Read', 'TAG' : 'None'}
+      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_6'}
       assert_has_monitoring_info(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
-      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'None'}
+      labels = {'PCOLLECTION' : 'ref_PCollection_PCollection_7'}
       assert_has_monitoring_info(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
-      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'once'}
-      assert_has_monitoring_info(
-          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
-      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'twice'}
-      assert_has_monitoring_info(
-          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=2)
     except:
       print(res._monitoring_infos_by_stage)
       raise
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 0eba87c..9455879 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -541,6 +541,7 @@ class BundleProcessor(object):
         # ignores input name
         input_op_by_target[
             input_op.target.primitive_transform_reference] = input_op
+
       for data_channel, expected_targets in data_channels.items():
         for data in data_channel.input_elements(
             instruction_id, expected_targets):
@@ -649,7 +650,44 @@ class BundleProcessor(object):
       for mi in op.monitoring_infos(transform_id).values():
         fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
         all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-    return list(all_monitoring_infos_dict.values())
+
+    infos_list = list(all_monitoring_infos_dict.values())
+
+    def inject_pcollection_into_element_count(monitoring_info):
+      """
+      If provided metric is element count metric:
+      Finds relevant transform output info in current process_bundle_descriptor
+      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
+      info.
+      """
+      if monitoring_info.urn == monitoring_infos.ELEMENT_COUNT_URN:
+        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
+          return
+        ptransform_label = monitoring_info.labels[
+            monitoring_infos.PTRANSFORM_LABEL]
+        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
+          return
+        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
+
+        if not ptransform_label in self.process_bundle_descriptor.transforms:
+          return
+        if not tag_label in self.process_bundle_descriptor.transforms[
+            ptransform_label].outputs:
+          return
+
+        pcollection_name = (self.process_bundle_descriptor
+                            .transforms[ptransform_label].outputs[tag_label])
+        monitoring_info.labels[
+            monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
+
+        # Cleaning up labels that are not in specification.
+        monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
+        monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
+
+    for mi in infos_list:
+      inject_pcollection_into_element_count(mi)
+
+    return infos_list
 
   def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
     actual_output_tags = list(