You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/15 20:26:46 UTC

[beam] branch master updated: Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c2b9d2  Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)
2c2b9d2 is described below

commit 2c2b9d2ba7c3cea048688a8fc0abd25030046005
Author: Alex Amato <aj...@google.com>
AuthorDate: Fri Mar 15 13:26:34 2019 -0700

    Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)
    
    [BEAM-6844] Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test.
---
 .../dataflow/dataflow_exercise_metrics_pipeline.py | 199 +++++++++++++++++++++
 .../dataflow_exercise_metrics_pipeline_test.py     |  68 +++++++
 .../runners/dataflow/dataflow_metrics.py           |  80 +++++++--
 .../runners/dataflow/dataflow_metrics_test.py      |  10 +-
 4 files changed, 336 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
new file mode 100644
index 0000000..a77497f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import time
+
+from hamcrest.library.number.ordering_comparison import greater_than
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.testing.metric_result_matchers import DistributionMatcher
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
+
+SLEEP_TIME_SECS = 1
+INPUT = [0, 0, 0, 100]
+METRIC_NAMESPACE = ('apache_beam.runners.dataflow.'
+                    'dataflow_exercise_metrics_pipeline.UserMetricsDoFn')
+
+
+def common_metric_matchers():
+  """MetricResult matchers common to all tests."""
+  # TODO(ajamato): Matcher for the 'metrics' step's ElementCount.
+  # TODO(ajamato): Matcher for the 'metrics' step's MeanByteCount.
+  # TODO(ajamato): Matcher for the start and finish exec times.
+  # TODO(ajamato): Matcher for a gauge metric once implemented in dataflow.
+  matchers = [
+      # User Counter Metrics.
+      MetricResultMatcher(
+          name='total_values',
+          namespace=METRIC_NAMESPACE,
+          step='metrics',
+          attempted=sum(INPUT),
+          committed=sum(INPUT)
+      ),
+      MetricResultMatcher(
+          name='ExecutionTime_StartBundle',
+          step='metrics',
+          attempted=greater_than(0),
+          committed=greater_than(0)
+      ),
+      MetricResultMatcher(
+          name='ExecutionTime_ProcessElement',
+          step='metrics',
+          attempted=greater_than(0),
+          committed=greater_than(0)
+      ),
+      MetricResultMatcher(
+          name='ExecutionTime_FinishBundle',
+          step='metrics',
+          attempted=greater_than(0),
+          committed=greater_than(0)
+      )
+  ]
+
+  pcoll_names = [
+      'GroupByKey/Reify-out0',
+      'GroupByKey/Read-out0',
+      'map_to_common_key-out0',
+      'GroupByKey/GroupByWindow-out0',
+      'GroupByKey/Read-out0',
+      'GroupByKey/Reify-out0'
+  ]
+  for name in pcoll_names:
+    matchers.extend([
+        MetricResultMatcher(
+            name='ElementCount',
+            labels={
+                'output_user_name': name,
+                'original_name': '%s-ElementCount' % name
+            },
+            attempted=greater_than(0),
+            committed=greater_than(0)
+        ),
+        MetricResultMatcher(
+            name='MeanByteCount',
+            labels={
+                'output_user_name': name,
+                'original_name': '%s-MeanByteCount' % name
+            },
+            attempted=greater_than(0),
+            committed=greater_than(0)
+        ),
+    ])
+  return matchers
+
+
+def fn_api_metric_matchers():
+  """MetricResult matchers with adjusted step names for the FN API DF test."""
+  matchers = common_metric_matchers()
+  return matchers
+
+
+def legacy_metric_matchers():
+  """MetricResult matchers with adjusted step names for the legacy DF test."""
+  # TODO(ajamato): Move these to the common_metric_matchers once implemented
+  # in the FN API.
+  matchers = common_metric_matchers()
+  matchers.extend([
+      # User distribution metric, legacy DF only.
+      MetricResultMatcher(
+          name='distribution_values',
+          namespace=METRIC_NAMESPACE,
+          step='metrics',
+          attempted=DistributionMatcher(
+              sum_value=sum(INPUT),
+              count_value=len(INPUT),
+              min_value=min(INPUT),
+              max_value=max(INPUT)
+          ),
+          committed=DistributionMatcher(
+              sum_value=sum(INPUT),
+              count_value=len(INPUT),
+              min_value=min(INPUT),
+              max_value=max(INPUT)
+          ),
+      ),
+      # Element count and MeanByteCount for a User ParDo.
+      MetricResultMatcher(
+          name='ElementCount',
+          labels={
+              'output_user_name': 'metrics-out0',
+              'original_name': 'metrics-out0-ElementCount'
+          },
+          attempted=greater_than(0),
+          committed=greater_than(0)
+      ),
+      MetricResultMatcher(
+          name='MeanByteCount',
+          labels={
+              'output_user_name': 'metrics-out0',
+              'original_name': 'metrics-out0-MeanByteCount'
+          },
+          attempted=greater_than(0),
+          committed=greater_than(0)
+      ),
+  ])
+  return matchers
+
+
+class UserMetricsDoFn(beam.DoFn):
+  """Parse each line of input text into words."""
+
+  def __init__(self):
+    self.total_metric = Metrics.counter(self.__class__, 'total_values')
+    self.dist_metric = Metrics.distribution(
+        self.__class__, 'distribution_values')
+    # TODO(ajamato): Add a verifier for gauge once it is supported by the SDKs
+    # and runners.
+    self.latest_metric = Metrics.gauge(self.__class__, 'latest_value')
+
+  def start_bundle(self):
+    time.sleep(SLEEP_TIME_SECS)
+
+  def process(self, element):
+    """Returns the processed element and increments the metrics."""
+    elem_int = int(element)
+    self.total_metric.inc(elem_int)
+    self.dist_metric.update(elem_int)
+    self.latest_metric.set(elem_int)
+    time.sleep(SLEEP_TIME_SECS)
+    return [elem_int]
+
+  def finish_bundle(self):
+    time.sleep(SLEEP_TIME_SECS)
+
+
+def apply_and_run(pipeline):
+  """Given an initialized Pipeline applies transforms and runs it."""
+  _ = (pipeline
+       | beam.Create(INPUT)
+       | 'metrics' >> (beam.ParDo(UserMetricsDoFn()))
+       | 'map_to_common_key' >> beam.Map(lambda x: ('key', x))
+       | beam.GroupByKey()
+       | 'm_out' >> beam.FlatMap(lambda x: [
+           1, 2, 3, 4, 5,
+           beam.pvalue.TaggedOutput('once', x),
+           beam.pvalue.TaggedOutput('twice', x),
+           beam.pvalue.TaggedOutput('twice', x)])
+      )
+  result = pipeline.run()
+  result.wait_until_finish()
+  return result
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
new file mode 100644
index 0000000..c62824d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.dataflow import dataflow_exercise_metrics_pipeline
+from apache_beam.testing import metric_result_matchers
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class ExerciseMetricsPipelineTest(unittest.TestCase):
+
+  def run_pipeline(self, **opts):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    argv = test_pipeline.get_full_options_as_args(**opts)
+    parser = argparse.ArgumentParser()
+    unused_known_args, pipeline_args = parser.parse_known_args(argv)
+
+    # We use the save_main_session option because one or more DoFn's in this
+    # workflow rely on global context (e.g., a module imported at module level).
+    pipeline_options = PipelineOptions(pipeline_args)
+    pipeline_options.view_as(SetupOptions).save_main_session = True
+    p = beam.Pipeline(options=pipeline_options)
+    return dataflow_exercise_metrics_pipeline.apply_and_run(p)
+
+  @attr('IT')
+  def test_metrics_it(self):
+    result = self.run_pipeline()
+    errors = metric_result_matchers.verify_all(
+        result.metrics().all_metrics(),
+        dataflow_exercise_metrics_pipeline.legacy_metric_matchers())
+    self.assertFalse(errors, str(errors))
+
+  @attr('IT', 'ValidatesContainer')
+  def test_metrics_fnapi_it(self):
+    result = self.run_pipeline(experiment='beam_fn_api')
+    errors = metric_result_matchers.verify_all(
+        result.metrics().all_metrics(),
+        dataflow_exercise_metrics_pipeline.fn_api_metric_matchers())
+    self.assertFalse(errors, str(errors))
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 8a90cd6..741e944 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -23,7 +23,10 @@ service.
 
 from __future__ import absolute_import
 
+import argparse
+import logging
 import numbers
+import sys
 from collections import defaultdict
 
 from future.utils import iteritems
@@ -34,6 +37,8 @@ from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metricbase import MetricName
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 def _get_match(proto, filter_fn):
@@ -52,8 +57,9 @@ def _get_match(proto, filter_fn):
 
 
 # V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
-STRUCTURED_NAME_LABELS = [
-    'execution_step', 'original_name', 'output_user_name', 'step']
+STEP_LABEL = 'step'
+STRUCTURED_NAME_LABELS = set([
+    'execution_step', 'original_name', 'output_user_name'])
 
 
 class DataflowMetrics(MetricResults):
@@ -104,6 +110,7 @@ class DataflowMetrics(MetricResults):
     """Populate the MetricKey object for a queried metric result."""
     step = ""
     name = metric.name.name # Always extract a name
+    labels = dict()
     try: # Try to extract the user step name.
       # If ValueError is thrown within this try-block, it is because of
       # one of the following:
@@ -113,7 +120,7 @@ class DataflowMetrics(MetricResults):
       # 2. Unable to unpack [step] or [namespace]; which should only happen
       #   for unstructured names.
       step = _get_match(metric.name.context.additionalProperties,
-                        lambda x: x.key == 'step').value
+                        lambda x: x.key == STEP_LABEL).value
       step = self._translate_step_name(step)
     except ValueError:
       pass
@@ -125,7 +132,6 @@ class DataflowMetrics(MetricResults):
     except ValueError:
       pass
 
-    labels = dict()
     for kv in metric.name.context.additionalProperties:
       if kv.key in STRUCTURED_NAME_LABELS:
         labels[kv.key] = kv.value
@@ -174,8 +180,6 @@ class DataflowMetrics(MetricResults):
     for metric_key, metric in iteritems(metrics_by_name):
       attempted = self._get_metric_value(metric['tentative'])
       committed = self._get_metric_value(metric['committed'])
-      if attempted is None or committed is None:
-        continue
       result.append(MetricResult(metric_key,
                                  attempted=attempted,
                                  committed=committed))
@@ -202,12 +206,13 @@ class DataflowMetrics(MetricResults):
     else:
       return None
 
-  def _get_metrics_from_dataflow(self):
+  def _get_metrics_from_dataflow(self, job_id=None):
     """Return cached metrics or query the dataflow service."""
-    try:
-      job_id = self.job_result.job_id()
-    except AttributeError:
-      job_id = None
+    if not job_id:
+      try:
+        job_id = self.job_result.job_id()
+      except AttributeError:
+        job_id = None
     if not job_id:
       raise ValueError('Can not query metrics. Job id is unknown.')
 
@@ -215,15 +220,16 @@ class DataflowMetrics(MetricResults):
       return self._cached_metrics
 
     job_metrics = self._dataflow_client.get_job_metrics(job_id)
-    # If the job has terminated, metrics will not change and we can cache them.
-    if self.job_result.is_in_terminal_state():
+    # If we cannot determine that the job has terminated,
+    # then metrics will not change and we can cache them.
+    if self.job_result and self.job_result.is_in_terminal_state():
       self._cached_metrics = job_metrics
     return job_metrics
 
-  def all_metrics(self):
+  def all_metrics(self, job_id=None):
     """Return all user and system metrics from the dataflow service."""
     metric_results = []
-    response = self._get_metrics_from_dataflow()
+    response = self._get_metrics_from_dataflow(job_id=job_id)
     self._populate_metrics(response, metric_results, user_metrics=True)
     self._populate_metrics(response, metric_results, user_metrics=False)
     return metric_results
@@ -239,3 +245,47 @@ class DataflowMetrics(MetricResults):
                                  if self.matches(filter, elm.key)
                                  and DataflowMetrics._is_distribution(elm)],
             self.GAUGES: []}  # TODO(pabloem): Add Gauge support for dataflow.
+
+
+def main(argv):
+  """Print the metric results for a the dataflow --job_id and --project.
+
+  Instead of running an entire pipeline which takes several minutes, use this
+  main method to display MetricResults for a specific --job_id and --project
+  which takes only a few seconds.
+  """
+  # TODO(BEAM-6833): The MetricResults do not show translated step names as the
+  # job_graph is not provided to DataflowMetrics.
+  # Import here to avoid adding the dependency for local running scenarios.
+  try:
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners.dataflow.internal import apiclient
+  except ImportError:
+    raise ImportError(
+        'Google Cloud Dataflow runner not available, '
+        'please install apache_beam[gcp]')
+  if argv[0] == __file__:
+    argv = argv[1:]
+  parser = argparse.ArgumentParser()
+  parser.add_argument('-j', '--job_id', type=str,
+                      help='The job id to query metrics for.')
+  parser.add_argument('-p', '--project', type=str,
+                      help='The project name to query metrics for.')
+  flags = parser.parse_args(argv)
+
+  # Get a Dataflow API client and set its project and job_id in the options.
+  options = PipelineOptions()
+  gcloud_options = options.view_as(GoogleCloudOptions)
+  gcloud_options.project = flags.project
+  dataflow_client = apiclient.DataflowApplicationClient(options)
+  df_metrics = DataflowMetrics(dataflow_client)
+  all_metrics = df_metrics.all_metrics(job_id=flags.job_id)
+  logging.info('Printing all MetricResults for %s in %s',
+               flags.job_id, flags.project)
+  for metric_result in all_metrics:
+    logging.info(metric_result)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  main(sys.argv)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index cdce02c..9899176 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -340,7 +340,7 @@ class TestDataflowMetrics(unittest.TestCase):
         MetricResult(
             MetricKey('split',
                       MetricName('__main__.WordExtractingDoFn', 'word_lengths'),
-                      labels={'step': 's2'}),
+                     ),
             109475, 109475),
         ]
     self.assertEqual(query_result['counters'], expected_counters)
@@ -350,7 +350,7 @@ class TestDataflowMetrics(unittest.TestCase):
             MetricKey('split',
                       MetricName('__main__.WordExtractingDoFn',
                                  'word_length_dist'),
-                      labels={'step': 's2'}),
+                     ),
             DistributionResult(DistributionData(
                 18, 2, 2, 16)),
             DistributionResult(DistributionData(
@@ -367,14 +367,12 @@ class TestDataflowMetrics(unittest.TestCase):
     expected_counters = [
         MetricResult(
             MetricKey('split',
-                      MetricName('__main__.WordExtractingDoFn', 'empty_lines'),
-                      labels={'step': 's2'}
+                      MetricName('__main__.WordExtractingDoFn', 'empty_lines')
                      ),
             1080, 1080),
         MetricResult(
             MetricKey('split',
-                      MetricName('__main__.WordExtractingDoFn', 'words'),
-                      labels={'step': 's2'}),
+                      MetricName('__main__.WordExtractingDoFn', 'words')),
             26181, 26185),
         ]
     self.assertEqual(sorted(query_result['counters'],