You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/28 00:33:12 UTC
[1/2] beam git commit: Querying of both structured and unstructured
metrics in dataflow.
Repository: beam
Updated Branches:
refs/heads/master 6d627534b -> e58155263
Querying of both structured and unstructured metrics in dataflow.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b98bde91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b98bde91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b98bde91
Branch: refs/heads/master
Commit: b98bde912c2b2dbcb0ce1d30f6af32b7219d831d
Parents: 6d62753
Author: Pablo <pa...@google.com>
Authored: Wed Mar 22 13:22:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 17:32:32 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_metrics.py | 86 +++++++++++++++-----
.../runners/dataflow/dataflow_metrics_test.py | 63 ++++++++++++--
.../runners/dataflow/dataflow_runner.py | 2 +-
3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index db5a2bc..f90e3d5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -30,25 +30,67 @@ from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
-# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from
-# dataflow service
class DataflowMetrics(MetricResults):
"""Implementation of MetricResults class for the Dataflow runner."""
- def __init__(self, dataflow_client=None, job_result=None):
+ def __init__(self, dataflow_client=None, job_result=None, job_graph=None):
"""Initialize the Dataflow metrics object.
Args:
dataflow_client: apiclient.DataflowApplicationClient to interact with the
dataflow service.
job_result: DataflowPipelineResult with the state and id information of
- the job
+ the job.
+ job_graph: apiclient.Job instance to be able to translate between internal
+ step names (e.g. "s2"), and user step names (e.g. "split").
"""
super(DataflowMetrics, self).__init__()
self._dataflow_client = dataflow_client
self.job_result = job_result
self._queried_after_termination = False
self._cached_metrics = None
+ self._job_graph = job_graph
+
+ def _translate_step_name(self, internal_name):
+ """Translate between internal step names (e.g. "s1") and user step names."""
+ if not self._job_graph:
+ raise ValueError('Could not translate the internal step name.')
+
+ try:
+ [step] = [step
+ for step in self._job_graph.proto.steps
+ if step.name == internal_name]
+ [user_step_name] = [prop.value.string_value
+ for prop in step.properties.additionalProperties
+ if prop.key == 'user_name']
+ except ValueError:
+ raise ValueError('Could not translate the internal step name.')
+ return user_step_name
+
+ def _get_metric_key(self, metric):
+ """Populate the MetricKey object for a queried metric result."""
+ try:
+ # If ValueError is thrown within this try-block, it is because of
+ # one of the following:
+ # 1. Unable to translate the step name. Only happening with improperly
+ # formatted job graph (unlikely), or step name not being the internal
+ # step name (only happens for unstructured-named metrics).
+ # 2. Unable to unpack [step] or [namespace]; which should only happen
+ # for unstructured names.
+ [step] = [prop.value
+ for prop in metric.name.context.additionalProperties
+ if prop.key == 'step']
+ step = self._translate_step_name(step)
+ [namespace] = [prop.value
+ for prop in metric.name.context.additionalProperties
+ if prop.key == 'namespace']
+ name = metric.name.name
+ except ValueError:
+ # An unstructured metric name is "step/namespace/name", but step names
+ # can (and often do) contain slashes. Must only split on the right-most
+ # two slashes, to preserve the full step name.
+ [step, namespace, name] = metric.name.name.rsplit('/', 2)
+ return MetricKey(step, MetricName(namespace, name))
def _populate_metric_results(self, response):
"""Take a list of metrics, and convert it to a list of MetricResult."""
@@ -59,29 +101,31 @@ class DataflowMetrics(MetricResults):
# Get the tentative/committed versions of every metric together.
metrics_by_name = defaultdict(lambda: {})
for metric in user_metrics:
- tentative = [prop
- for prop in metric.name.context.additionalProperties
- if prop.key == 'tentative' and prop.value == 'true']
- key = 'tentative' if tentative else 'committed'
- metrics_by_name[metric.name.name][key] = metric
-
- # Now we create the MetricResult elements.
- result = []
- for name, metric in metrics_by_name.iteritems():
- if (name.endswith('(DIST)') or
- name.endswith('[MIN]') or
- name.endswith('[MAX]') or
- name.endswith('[MEAN]') or
- name.endswith('[COUNT]')):
+ if (metric.name.name.endswith('(DIST)') or
+ metric.name.name.endswith('[MIN]') or
+ metric.name.name.endswith('[MAX]') or
+ metric.name.name.endswith('[MEAN]') or
+ metric.name.name.endswith('[COUNT]')):
warn('Distribution metrics will be ignored in the MetricsResult.query'
'method. You can see them in the Dataflow User Interface.')
# Distributions are not yet fully supported in this runner
continue
- [step, namespace, name] = name.split('/')
- key = MetricKey(step, MetricName(namespace, name))
+ is_tentative = [prop
+ for prop in metric.name.context.additionalProperties
+ if prop.key == 'tentative' and prop.value == 'true']
+ tentative_or_committed = 'tentative' if is_tentative else 'committed'
+
+ metric_key = self._get_metric_key(metric)
+ metrics_by_name[metric_key][tentative_or_committed] = metric
+
+ # Now we create the MetricResult elements.
+ result = []
+ for metric_key, metric in metrics_by_name.iteritems():
attempted = metric['tentative'].scalar.integer_value
committed = metric['committed'].scalar.integer_value
- result.append(MetricResult(key, attempted=attempted, committed=committed))
+ result.append(MetricResult(metric_key,
+ attempted=attempted,
+ committed=committed))
return result
http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 8d18fae..95027a3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -18,6 +18,7 @@
Tests corresponding to the DataflowRunner implementation of MetricsResult,
the DataflowMetrics class.
"""
+import types
import unittest
import mock
@@ -43,6 +44,37 @@ class DictToObject(object):
class TestDataflowMetrics(unittest.TestCase):
+ STRUCTURED_COUNTER_LIST = {"metrics": [
+ {"name": {"context":
+ {"additionalProperties": [
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"},
+ {"key": "tentative",
+ "value": "true"}]
+ },
+ "name": "word_lengths",
+ "origin": "user"
+ },
+ "scalar": {"integer_value": 109475},
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
+ {"name": {"context":
+ {"additionalProperties": [
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"}]
+ },
+ "name": "word_lengths",
+ "origin": "user"
+ },
+ "scalar": {"integer_value": 109475},
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
+ ]}
+
BASIC_COUNTER_LIST = {"metrics": [
{"name": {"context":
{"additionalProperties":[
@@ -71,18 +103,19 @@ class TestDataflowMetrics(unittest.TestCase):
"value": "user-split-split/__main__.WordExtractingDoFn/"
"words_TentativeAggregateValue"},
{"key": "step", "value": "split"}]},
- "name": "split/__main__.WordExtractingDoFn/words",
+ "name": "longstepname/split/__main__.WordExtractingDoFn/words",
"origin": "user"},
"scalar": {"integer_value": 26181},
"updateTime": "2017-02-23T01:13:36.659Z"},
{"name": {"context":
{"additionalProperties": [
{"key": "original_name",
- "value": "user-split-split/__main__.WordExtractingDoFn/"
+ "value": "user-split-longstepname/split/"
+ "__main__.WordExtractingDoFn/"
"words_TentativeAggregateValue"},
{"key": "step", "value": "split"},
{"key": "tentative", "value": "true"}]},
- "name": "split/__main__.WordExtractingDoFn/words",
+ "name": "longstepname/split/__main__.WordExtractingDoFn/words",
"origin": "user"},
"scalar": {"integer_value": 26185},
"updateTime": "2017-02-23T01:13:36.659Z"},
@@ -100,9 +133,12 @@ class TestDataflowMetrics(unittest.TestCase):
"updateTime": "2017-02-23T01:13:36.659Z"}
]}
- def setup_mock_client_result(self):
+ def setup_mock_client_result(self, counter_list=None):
+ if counter_list is None:
+ counter_list = self.BASIC_COUNTER_LIST
+
mock_client = mock.Mock()
- mock_query_result = DictToObject(self.BASIC_COUNTER_LIST)
+ mock_query_result = DictToObject(counter_list)
mock_client.get_job_metrics.return_value = mock_query_result
mock_job_result = mock.Mock()
mock_job_result.job_id.return_value = 1
@@ -125,6 +161,21 @@ class TestDataflowMetrics(unittest.TestCase):
dm.query()
self.assertTrue(dm._cached_metrics)
+ def test_query_structured_counters(self):
+ mock_client, mock_job_result = self.setup_mock_client_result(
+ self.STRUCTURED_COUNTER_LIST)
+ dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+ dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
+ query_result = dm.query()
+ expected_counters = [
+ MetricResult(
+ MetricKey('split',
+ MetricName('__main__.WordExtractingDoFn',
+ 'word_lengths')),
+ 109475, 109475),
+ ]
+ self.assertEqual(query_result['counters'], expected_counters)
+
def test_query_counters(self):
mock_client, mock_job_result = self.setup_mock_client_result()
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
@@ -135,7 +186,7 @@ class TestDataflowMetrics(unittest.TestCase):
MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
1080, 1080),
MetricResult(
- MetricKey('split',
+ MetricKey('longstepname/split',
MetricName('__main__.WordExtractingDoFn', 'words')),
26181, 26185),
]
http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 25f2fd4..bd29d63 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -174,7 +174,7 @@ class DataflowRunner(PipelineRunner):
result = DataflowPipelineResult(
self.dataflow_client.create_job(self.job), self)
- self._metrics = DataflowMetrics(self.dataflow_client, result)
+ self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
result.metric_results = self._metrics
return result
[2/2] beam git commit: This closes #2290
Posted by al...@apache.org.
This closes #2290
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5815526
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5815526
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5815526
Branch: refs/heads/master
Commit: e58155263ecff4b5533fda0033da4d4cecb66c5e
Parents: 6d62753 b98bde9
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 17:32:59 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 17:32:59 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_metrics.py | 86 +++++++++++++++-----
.../runners/dataflow/dataflow_metrics_test.py | 63 ++++++++++++--
.../runners/dataflow/dataflow_runner.py | 2 +-
3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------