You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/28 00:33:12 UTC

[1/2] beam git commit: Querying of both structured and unstructured metrics in dataflow.

Repository: beam
Updated Branches:
  refs/heads/master 6d627534b -> e58155263


Querying of both structured and unstructured metrics in dataflow.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b98bde91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b98bde91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b98bde91

Branch: refs/heads/master
Commit: b98bde912c2b2dbcb0ce1d30f6af32b7219d831d
Parents: 6d62753
Author: Pablo <pa...@google.com>
Authored: Wed Mar 22 13:22:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 17:32:32 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_metrics.py        | 86 +++++++++++++++-----
 .../runners/dataflow/dataflow_metrics_test.py   | 63 ++++++++++++--
 .../runners/dataflow/dataflow_runner.py         |  2 +-
 3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index db5a2bc..f90e3d5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -30,25 +30,67 @@ from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metricbase import MetricName
 
 
-# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from
-# dataflow service
 class DataflowMetrics(MetricResults):
   """Implementation of MetricResults class for the Dataflow runner."""
 
-  def __init__(self, dataflow_client=None, job_result=None):
+  def __init__(self, dataflow_client=None, job_result=None, job_graph=None):
     """Initialize the Dataflow metrics object.
 
     Args:
       dataflow_client: apiclient.DataflowApplicationClient to interact with the
         dataflow service.
       job_result: DataflowPipelineResult with the state and id information of
-        the job
+        the job.
+      job_graph: apiclient.Job instance to be able to translate between internal
+        step names (e.g. "s2"), and user step names (e.g. "split").
     """
     super(DataflowMetrics, self).__init__()
     self._dataflow_client = dataflow_client
     self.job_result = job_result
     self._queried_after_termination = False
     self._cached_metrics = None
+    self._job_graph = job_graph
+
+  def _translate_step_name(self, internal_name):
+    """Translate between internal step names (e.g. "s1") and user step names."""
+    if not self._job_graph:
+      raise ValueError('Could not translate the internal step name.')
+
+    try:
+      [step] = [step
+                for step in self._job_graph.proto.steps
+                if step.name == internal_name]
+      [user_step_name] = [prop.value.string_value
+                          for prop in step.properties.additionalProperties
+                          if prop.key == 'user_name']
+    except ValueError:
+      raise ValueError('Could not translate the internal step name.')
+    return user_step_name
+
+  def _get_metric_key(self, metric):
+    """Populate the MetricKey object for a queried metric result."""
+    try:
+      # If ValueError is thrown within this try-block, it is because of
+      # one of the following:
+      # 1. Unable to translate the step name. Only happening with improperly
+      #   formatted job graph (unlikely), or step name not being the internal
+      #   step name (only happens for unstructured-named metrics).
+      # 2. Unable to unpack [step] or [namespace]; which should only happen
+      #   for unstructured names.
+      [step] = [prop.value
+                for prop in metric.name.context.additionalProperties
+                if prop.key == 'step']
+      step = self._translate_step_name(step)
+      [namespace] = [prop.value
+                     for prop in metric.name.context.additionalProperties
+                     if prop.key == 'namespace']
+      name = metric.name.name
+    except ValueError:
+      # An unstructured metric name is "step/namespace/name", but step names
+      # can (and often do) contain slashes. Must only split on the right-most
+      # two slashes, to preserve the full step name.
+      [step, namespace, name] = metric.name.name.rsplit('/', 2)
+    return MetricKey(step, MetricName(namespace, name))
 
   def _populate_metric_results(self, response):
     """Take a list of metrics, and convert it to a list of MetricResult."""
@@ -59,29 +101,31 @@ class DataflowMetrics(MetricResults):
     # Get the tentative/committed versions of every metric together.
     metrics_by_name = defaultdict(lambda: {})
     for metric in user_metrics:
-      tentative = [prop
-                   for prop in metric.name.context.additionalProperties
-                   if prop.key == 'tentative' and prop.value == 'true']
-      key = 'tentative' if tentative else 'committed'
-      metrics_by_name[metric.name.name][key] = metric
-
-    # Now we create the MetricResult elements.
-    result = []
-    for name, metric in metrics_by_name.iteritems():
-      if (name.endswith('(DIST)') or
-          name.endswith('[MIN]') or
-          name.endswith('[MAX]') or
-          name.endswith('[MEAN]') or
-          name.endswith('[COUNT]')):
+      if (metric.name.name.endswith('(DIST)') or
+          metric.name.name.endswith('[MIN]') or
+          metric.name.name.endswith('[MAX]') or
+          metric.name.name.endswith('[MEAN]') or
+          metric.name.name.endswith('[COUNT]')):
         warn('Distribution metrics will be ignored in the MetricsResult.query'
              'method. You can see them in the Dataflow User Interface.')
         # Distributions are not yet fully supported in this runner
         continue
-      [step, namespace, name] = name.split('/')
-      key = MetricKey(step, MetricName(namespace, name))
+      is_tentative = [prop
+                      for prop in metric.name.context.additionalProperties
+                      if prop.key == 'tentative' and prop.value == 'true']
+      tentative_or_committed = 'tentative' if is_tentative else 'committed'
+
+      metric_key = self._get_metric_key(metric)
+      metrics_by_name[metric_key][tentative_or_committed] = metric
+
+    # Now we create the MetricResult elements.
+    result = []
+    for metric_key, metric in metrics_by_name.iteritems():
       attempted = metric['tentative'].scalar.integer_value
       committed = metric['committed'].scalar.integer_value
-      result.append(MetricResult(key, attempted=attempted, committed=committed))
+      result.append(MetricResult(metric_key,
+                                 attempted=attempted,
+                                 committed=committed))
 
     return result
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 8d18fae..95027a3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -18,6 +18,7 @@
 Tests corresponding to the DataflowRunner implementation of MetricsResult,
 the DataflowMetrics class.
 """
+import types
 import unittest
 
 import mock
@@ -43,6 +44,37 @@ class DictToObject(object):
 
 class TestDataflowMetrics(unittest.TestCase):
 
+  STRUCTURED_COUNTER_LIST = {"metrics": [
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"},
+                    {"key": "tentative",
+                     "value": "true"}]
+                },
+                "name": "word_lengths",
+                "origin": "user"
+               },
+       "scalar": {"integer_value": 109475},
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"}]
+                },
+                "name": "word_lengths",
+                "origin": "user"
+               },
+       "scalar": {"integer_value": 109475},
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+  ]}
+
   BASIC_COUNTER_LIST = {"metrics": [
       {"name": {"context":
                 {"additionalProperties":[
@@ -71,18 +103,19 @@ class TestDataflowMetrics(unittest.TestCase):
                      "value": "user-split-split/__main__.WordExtractingDoFn/"
                               "words_TentativeAggregateValue"},
                     {"key": "step", "value": "split"}]},
-                "name": "split/__main__.WordExtractingDoFn/words",
+                "name": "longstepname/split/__main__.WordExtractingDoFn/words",
                 "origin": "user"},
        "scalar": {"integer_value": 26181},
        "updateTime": "2017-02-23T01:13:36.659Z"},
       {"name": {"context":
                 {"additionalProperties": [
                     {"key": "original_name",
-                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                     "value": "user-split-longstepname/split/"
+                              "__main__.WordExtractingDoFn/"
                               "words_TentativeAggregateValue"},
                     {"key": "step", "value": "split"},
                     {"key": "tentative", "value": "true"}]},
-                "name": "split/__main__.WordExtractingDoFn/words",
+                "name": "longstepname/split/__main__.WordExtractingDoFn/words",
                 "origin": "user"},
        "scalar": {"integer_value": 26185},
        "updateTime": "2017-02-23T01:13:36.659Z"},
@@ -100,9 +133,12 @@ class TestDataflowMetrics(unittest.TestCase):
        "updateTime": "2017-02-23T01:13:36.659Z"}
   ]}
 
-  def setup_mock_client_result(self):
+  def setup_mock_client_result(self, counter_list=None):
+    if counter_list is None:
+      counter_list = self.BASIC_COUNTER_LIST
+
     mock_client = mock.Mock()
-    mock_query_result = DictToObject(self.BASIC_COUNTER_LIST)
+    mock_query_result = DictToObject(counter_list)
     mock_client.get_job_metrics.return_value = mock_query_result
     mock_job_result = mock.Mock()
     mock_job_result.job_id.return_value = 1
@@ -125,6 +161,21 @@ class TestDataflowMetrics(unittest.TestCase):
     dm.query()
     self.assertTrue(dm._cached_metrics)
 
+  def test_query_structured_counters(self):
+    mock_client, mock_job_result = self.setup_mock_client_result(
+        self.STRUCTURED_COUNTER_LIST)
+    dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+    dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
+    query_result = dm.query()
+    expected_counters = [
+        MetricResult(
+            MetricKey('split',
+                      MetricName('__main__.WordExtractingDoFn',
+                                 'word_lengths')),
+            109475, 109475),
+        ]
+    self.assertEqual(query_result['counters'], expected_counters)
+
   def test_query_counters(self):
     mock_client, mock_job_result = self.setup_mock_client_result()
     dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
@@ -135,7 +186,7 @@ class TestDataflowMetrics(unittest.TestCase):
                       MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
             1080, 1080),
         MetricResult(
-            MetricKey('split',
+            MetricKey('longstepname/split',
                       MetricName('__main__.WordExtractingDoFn', 'words')),
             26181, 26185),
         ]

http://git-wip-us.apache.org/repos/asf/beam/blob/b98bde91/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 25f2fd4..bd29d63 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -174,7 +174,7 @@ class DataflowRunner(PipelineRunner):
     result = DataflowPipelineResult(
         self.dataflow_client.create_job(self.job), self)
 
-    self._metrics = DataflowMetrics(self.dataflow_client, result)
+    self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
     result.metric_results = self._metrics
     return result
 


[2/2] beam git commit: This closes #2290

Posted by al...@apache.org.
This closes #2290


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5815526
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5815526
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5815526

Branch: refs/heads/master
Commit: e58155263ecff4b5533fda0033da4d4cecb66c5e
Parents: 6d62753 b98bde9
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 17:32:59 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 17:32:59 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_metrics.py        | 86 +++++++++++++++-----
 .../runners/dataflow/dataflow_metrics_test.py   | 63 ++++++++++++--
 .../runners/dataflow/dataflow_runner.py         |  2 +-
 3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------