You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/15 20:26:46 UTC
[beam] branch master updated: Add MetricResultMatcher verification
for user metrics, GBK element count,
and process time to an integration test. (#8038)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2c2b9d2 Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)
2c2b9d2 is described below
commit 2c2b9d2ba7c3cea048688a8fc0abd25030046005
Author: Alex Amato <aj...@google.com>
AuthorDate: Fri Mar 15 13:26:34 2019 -0700
Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)
[BEAM-6844] Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test.
---
.../dataflow/dataflow_exercise_metrics_pipeline.py | 199 +++++++++++++++++++++
.../dataflow_exercise_metrics_pipeline_test.py | 68 +++++++
.../runners/dataflow/dataflow_metrics.py | 80 +++++++--
.../runners/dataflow/dataflow_metrics_test.py | 10 +-
4 files changed, 336 insertions(+), 21 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
new file mode 100644
index 0000000..a77497f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import time
+
+from hamcrest.library.number.ordering_comparison import greater_than
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.testing.metric_result_matchers import DistributionMatcher
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
+
+SLEEP_TIME_SECS = 1
+INPUT = [0, 0, 0, 100]
+METRIC_NAMESPACE = ('apache_beam.runners.dataflow.'
+ 'dataflow_exercise_metrics_pipeline.UserMetricsDoFn')
+
+
+def common_metric_matchers():
+ """MetricResult matchers common to all tests."""
+ # TODO(ajamato): Matcher for the 'metrics' step's ElementCount.
+ # TODO(ajamato): Matcher for the 'metrics' step's MeanByteCount.
+ # TODO(ajamato): Matcher for the start and finish exec times.
+ # TODO(ajamato): Matcher for a gauge metric once implemented in dataflow.
+ matchers = [
+ # User Counter Metrics.
+ MetricResultMatcher(
+ name='total_values',
+ namespace=METRIC_NAMESPACE,
+ step='metrics',
+ attempted=sum(INPUT),
+ committed=sum(INPUT)
+ ),
+ MetricResultMatcher(
+ name='ExecutionTime_StartBundle',
+ step='metrics',
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ MetricResultMatcher(
+ name='ExecutionTime_ProcessElement',
+ step='metrics',
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ MetricResultMatcher(
+ name='ExecutionTime_FinishBundle',
+ step='metrics',
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ )
+ ]
+
+ pcoll_names = [
+ 'GroupByKey/Reify-out0',
+ 'GroupByKey/Read-out0',
+ 'map_to_common_key-out0',
+ 'GroupByKey/GroupByWindow-out0',
+ 'GroupByKey/Read-out0',
+ 'GroupByKey/Reify-out0'
+ ]
+ for name in pcoll_names:
+ matchers.extend([
+ MetricResultMatcher(
+ name='ElementCount',
+ labels={
+ 'output_user_name': name,
+ 'original_name': '%s-ElementCount' % name
+ },
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ MetricResultMatcher(
+ name='MeanByteCount',
+ labels={
+ 'output_user_name': name,
+ 'original_name': '%s-MeanByteCount' % name
+ },
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ ])
+ return matchers
+
+
+def fn_api_metric_matchers():
+ """MetricResult matchers with adjusted step names for the FN API DF test."""
+ matchers = common_metric_matchers()
+ return matchers
+
+
+def legacy_metric_matchers():
+ """MetricResult matchers with adjusted step names for the legacy DF test."""
+ # TODO(ajamato): Move these to the common_metric_matchers once implemented
+ # in the FN API.
+ matchers = common_metric_matchers()
+ matchers.extend([
+ # User distribution metric, legacy DF only.
+ MetricResultMatcher(
+ name='distribution_values',
+ namespace=METRIC_NAMESPACE,
+ step='metrics',
+ attempted=DistributionMatcher(
+ sum_value=sum(INPUT),
+ count_value=len(INPUT),
+ min_value=min(INPUT),
+ max_value=max(INPUT)
+ ),
+ committed=DistributionMatcher(
+ sum_value=sum(INPUT),
+ count_value=len(INPUT),
+ min_value=min(INPUT),
+ max_value=max(INPUT)
+ ),
+ ),
+ # Element count and MeanByteCount for a User ParDo.
+ MetricResultMatcher(
+ name='ElementCount',
+ labels={
+ 'output_user_name': 'metrics-out0',
+ 'original_name': 'metrics-out0-ElementCount'
+ },
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ MetricResultMatcher(
+ name='MeanByteCount',
+ labels={
+ 'output_user_name': 'metrics-out0',
+ 'original_name': 'metrics-out0-MeanByteCount'
+ },
+ attempted=greater_than(0),
+ committed=greater_than(0)
+ ),
+ ])
+ return matchers
+
+
+class UserMetricsDoFn(beam.DoFn):
+ """Parse each line of input text into words."""
+
+ def __init__(self):
+ self.total_metric = Metrics.counter(self.__class__, 'total_values')
+ self.dist_metric = Metrics.distribution(
+ self.__class__, 'distribution_values')
+ # TODO(ajamato): Add a verifier for gauge once it is supported by the SDKs
+ # and runners.
+ self.latest_metric = Metrics.gauge(self.__class__, 'latest_value')
+
+ def start_bundle(self):
+ time.sleep(SLEEP_TIME_SECS)
+
+ def process(self, element):
+ """Returns the processed element and increments the metrics."""
+ elem_int = int(element)
+ self.total_metric.inc(elem_int)
+ self.dist_metric.update(elem_int)
+ self.latest_metric.set(elem_int)
+ time.sleep(SLEEP_TIME_SECS)
+ return [elem_int]
+
+ def finish_bundle(self):
+ time.sleep(SLEEP_TIME_SECS)
+
+
+def apply_and_run(pipeline):
+ """Given an initialized Pipeline applies transforms and runs it."""
+ _ = (pipeline
+ | beam.Create(INPUT)
+ | 'metrics' >> (beam.ParDo(UserMetricsDoFn()))
+ | 'map_to_common_key' >> beam.Map(lambda x: ('key', x))
+ | beam.GroupByKey()
+ | 'm_out' >> beam.FlatMap(lambda x: [
+ 1, 2, 3, 4, 5,
+ beam.pvalue.TaggedOutput('once', x),
+ beam.pvalue.TaggedOutput('twice', x),
+ beam.pvalue.TaggedOutput('twice', x)])
+ )
+ result = pipeline.run()
+ result.wait_until_finish()
+ return result
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
new file mode 100644
index 0000000..c62824d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.dataflow import dataflow_exercise_metrics_pipeline
+from apache_beam.testing import metric_result_matchers
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class ExerciseMetricsPipelineTest(unittest.TestCase):
+
+ def run_pipeline(self, **opts):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ argv = test_pipeline.get_full_options_as_args(**opts)
+ parser = argparse.ArgumentParser()
+ unused_known_args, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
+ return dataflow_exercise_metrics_pipeline.apply_and_run(p)
+
+ @attr('IT')
+ def test_metrics_it(self):
+ result = self.run_pipeline()
+ errors = metric_result_matchers.verify_all(
+ result.metrics().all_metrics(),
+ dataflow_exercise_metrics_pipeline.legacy_metric_matchers())
+ self.assertFalse(errors, str(errors))
+
+ @attr('IT', 'ValidatesContainer')
+ def test_metrics_fnapi_it(self):
+ result = self.run_pipeline(experiment='beam_fn_api')
+ errors = metric_result_matchers.verify_all(
+ result.metrics().all_metrics(),
+ dataflow_exercise_metrics_pipeline.fn_api_metric_matchers())
+ self.assertFalse(errors, str(errors))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 8a90cd6..741e944 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -23,7 +23,10 @@ service.
from __future__ import absolute_import
+import argparse
+import logging
import numbers
+import sys
from collections import defaultdict
from future.utils import iteritems
@@ -34,6 +37,8 @@ from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
def _get_match(proto, filter_fn):
@@ -52,8 +57,9 @@ def _get_match(proto, filter_fn):
# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
-STRUCTURED_NAME_LABELS = [
- 'execution_step', 'original_name', 'output_user_name', 'step']
+STEP_LABEL = 'step'
+STRUCTURED_NAME_LABELS = set([
+ 'execution_step', 'original_name', 'output_user_name'])
class DataflowMetrics(MetricResults):
@@ -104,6 +110,7 @@ class DataflowMetrics(MetricResults):
"""Populate the MetricKey object for a queried metric result."""
step = ""
name = metric.name.name # Always extract a name
+ labels = dict()
try: # Try to extract the user step name.
# If ValueError is thrown within this try-block, it is because of
# one of the following:
@@ -113,7 +120,7 @@ class DataflowMetrics(MetricResults):
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(metric.name.context.additionalProperties,
- lambda x: x.key == 'step').value
+ lambda x: x.key == STEP_LABEL).value
step = self._translate_step_name(step)
except ValueError:
pass
@@ -125,7 +132,6 @@ class DataflowMetrics(MetricResults):
except ValueError:
pass
- labels = dict()
for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
@@ -174,8 +180,6 @@ class DataflowMetrics(MetricResults):
for metric_key, metric in iteritems(metrics_by_name):
attempted = self._get_metric_value(metric['tentative'])
committed = self._get_metric_value(metric['committed'])
- if attempted is None or committed is None:
- continue
result.append(MetricResult(metric_key,
attempted=attempted,
committed=committed))
@@ -202,12 +206,13 @@ class DataflowMetrics(MetricResults):
else:
return None
- def _get_metrics_from_dataflow(self):
+ def _get_metrics_from_dataflow(self, job_id=None):
"""Return cached metrics or query the dataflow service."""
- try:
- job_id = self.job_result.job_id()
- except AttributeError:
- job_id = None
+ if not job_id:
+ try:
+ job_id = self.job_result.job_id()
+ except AttributeError:
+ job_id = None
if not job_id:
raise ValueError('Can not query metrics. Job id is unknown.')
@@ -215,15 +220,16 @@ class DataflowMetrics(MetricResults):
return self._cached_metrics
job_metrics = self._dataflow_client.get_job_metrics(job_id)
- # If the job has terminated, metrics will not change and we can cache them.
- if self.job_result.is_in_terminal_state():
+ # If we cannot determine that the job has terminated,
+ # then metrics will not change and we can cache them.
+ if self.job_result and self.job_result.is_in_terminal_state():
self._cached_metrics = job_metrics
return job_metrics
- def all_metrics(self):
+ def all_metrics(self, job_id=None):
"""Return all user and system metrics from the dataflow service."""
metric_results = []
- response = self._get_metrics_from_dataflow()
+ response = self._get_metrics_from_dataflow(job_id=job_id)
self._populate_metrics(response, metric_results, user_metrics=True)
self._populate_metrics(response, metric_results, user_metrics=False)
return metric_results
@@ -239,3 +245,47 @@ class DataflowMetrics(MetricResults):
if self.matches(filter, elm.key)
and DataflowMetrics._is_distribution(elm)],
self.GAUGES: []} # TODO(pabloem): Add Gauge support for dataflow.
+
+
+def main(argv):
+ """Print the metric results for a the dataflow --job_id and --project.
+
+ Instead of running an entire pipeline which takes several minutes, use this
+ main method to display MetricResults for a specific --job_id and --project
+ which takes only a few seconds.
+ """
+ # TODO(BEAM-6833): The MetricResults do not show translated step names as the
+ # job_graph is not provided to DataflowMetrics.
+ # Import here to avoid adding the dependency for local running scenarios.
+ try:
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.runners.dataflow.internal import apiclient
+ except ImportError:
+ raise ImportError(
+ 'Google Cloud Dataflow runner not available, '
+ 'please install apache_beam[gcp]')
+ if argv[0] == __file__:
+ argv = argv[1:]
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-j', '--job_id', type=str,
+ help='The job id to query metrics for.')
+ parser.add_argument('-p', '--project', type=str,
+ help='The project name to query metrics for.')
+ flags = parser.parse_args(argv)
+
+ # Get a Dataflow API client and set its project and job_id in the options.
+ options = PipelineOptions()
+ gcloud_options = options.view_as(GoogleCloudOptions)
+ gcloud_options.project = flags.project
+ dataflow_client = apiclient.DataflowApplicationClient(options)
+ df_metrics = DataflowMetrics(dataflow_client)
+ all_metrics = df_metrics.all_metrics(job_id=flags.job_id)
+ logging.info('Printing all MetricResults for %s in %s',
+ flags.job_id, flags.project)
+ for metric_result in all_metrics:
+ logging.info(metric_result)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ main(sys.argv)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index cdce02c..9899176 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -340,7 +340,7 @@ class TestDataflowMetrics(unittest.TestCase):
MetricResult(
MetricKey('split',
MetricName('__main__.WordExtractingDoFn', 'word_lengths'),
- labels={'step': 's2'}),
+ ),
109475, 109475),
]
self.assertEqual(query_result['counters'], expected_counters)
@@ -350,7 +350,7 @@ class TestDataflowMetrics(unittest.TestCase):
MetricKey('split',
MetricName('__main__.WordExtractingDoFn',
'word_length_dist'),
- labels={'step': 's2'}),
+ ),
DistributionResult(DistributionData(
18, 2, 2, 16)),
DistributionResult(DistributionData(
@@ -367,14 +367,12 @@ class TestDataflowMetrics(unittest.TestCase):
expected_counters = [
MetricResult(
MetricKey('split',
- MetricName('__main__.WordExtractingDoFn', 'empty_lines'),
- labels={'step': 's2'}
+ MetricName('__main__.WordExtractingDoFn', 'empty_lines')
),
1080, 1080),
MetricResult(
MetricKey('split',
- MetricName('__main__.WordExtractingDoFn', 'words'),
- labels={'step': 's2'}),
+ MetricName('__main__.WordExtractingDoFn', 'words')),
26181, 26185),
]
self.assertEqual(sorted(query_result['counters'],