You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/05 20:42:56 UTC
[1/2] beam git commit: Metrics are queriable from Python SDK
Repository: beam
Updated Branches:
refs/heads/master 0490d6b36 -> da164a3fb
Metrics are queriable from Python SDK
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0afb140d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0afb140d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0afb140d
Branch: refs/heads/master
Commit: 0afb140daaeaabded6a9873d4ab5710a400b2125
Parents: 0490d6b
Author: Pablo <pa...@google.com>
Authored: Wed May 3 15:50:15 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri May 5 13:41:25 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/datastore_wordcount.py | 13 +-
sdks/python/apache_beam/examples/wordcount.py | 7 +-
sdks/python/apache_beam/metrics/cells.py | 16 +-
sdks/python/apache_beam/metrics/execution.py | 2 +-
.../runners/dataflow/dataflow_metrics.py | 114 ++++++++---
.../runners/dataflow/dataflow_metrics_test.py | 197 +++++++++++++------
6 files changed, 253 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 6e7ad76..d694cda 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -88,6 +88,7 @@ class WordExtractingDoFn(beam.DoFn):
self.empty_line_counter = Metrics.counter('main', 'empty_lines')
self.word_length_counter = Metrics.counter('main', 'word_lengths')
self.word_counter = Metrics.counter('main', 'total_words')
+ self.word_lengths_dist = Metrics.distribution('main', 'word_len_dist')
def process(self, element):
"""Returns an iterator over words in contents of Cloud Datastore entity.
@@ -107,6 +108,7 @@ class WordExtractingDoFn(beam.DoFn):
words = re.findall(r'[A-Za-z\']+', text_line)
for w in words:
self.word_length_counter.inc(len(w))
+ self.word_len_dis.update(len(w))
self.word_counter.inc()
return words
@@ -254,7 +256,16 @@ def run(argv=None):
if query_result['counters']:
empty_lines_counter = query_result['counters'][0]
logging.info('number of empty lines: %d', empty_lines_counter.committed)
- # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
+ else:
+ logging.warn('unable to retrieve counter metrics from runner')
+
+ word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+ query_result = result.metrics().query(word_lengths_filter)
+ if query_result['distributions']:
+ word_lengths_dist = query_result['distributions'][0]
+ logging.info('average word length: %d', word_lengths_dist.committed.mean)
+ else:
+ logging.warn('unable to retrieve distribution metrics from runner')
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e93fd2b..adcc4db 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -117,7 +117,12 @@ def run(argv=None):
if query_result['counters']:
empty_lines_counter = query_result['counters'][0]
logging.info('number of empty lines: %d', empty_lines_counter.committed)
- # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
+
+ word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+ query_result = result.metrics().query(word_lengths_filter)
+ if query_result['distributions']:
+ word_lengths_dist = query_result['distributions'][0]
+ logging.info('average word length: %d', word_lengths_dist.committed.mean)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index c421949..41d24be 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -193,6 +193,13 @@ class DistributionResult(object):
def __eq__(self, other):
return self.data == other.data
+ def __repr__(self):
+ return '<DistributionResult(sum={}, count={}, min={}, max={})>'.format(
+ self.sum,
+ self.count,
+ self.min,
+ self.max)
+
@property
def max(self):
return self.data.max
@@ -244,10 +251,11 @@ class DistributionData(object):
return not self.__eq__(other)
def __repr__(self):
- return '<DistributionData({}, {}, {}, {})>'.format(self.sum,
- self.count,
- self.min,
- self.max)
+ return '<DistributionData(sum={}, count={}, min={}, max={})>'.format(
+ self.sum,
+ self.count,
+ self.min,
+ self.max)
def get_cumulative(self):
return DistributionData(self.sum, self.count, self.min, self.max)
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index 887423b..dbd0533 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -96,7 +96,7 @@ class MetricResult(object):
def __str__(self):
return 'MetricResult(key={}, committed={}, attempted={})'.format(
- self.key, self.committed, self.attempted)
+ self.key, str(self.committed), str(self.attempted))
class _MetricsEnvironment(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index fe0122f..24916fd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -22,14 +22,31 @@ service.
"""
from collections import defaultdict
-from warnings import warn
+import numbers
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
+def _get_match(proto, filter_fn):
+ """Finds and returns the first element that matches a query.
+
+ If no element matches the query, it throws ValueError.
+ If more than one element matches the query, it returns only the first.
+ """
+ query = [elm for elm in proto if filter_fn(elm)]
+ if len(query) == 0:
+ raise ValueError('Could not find element')
+ elif len(query) > 1:
+ raise ValueError('Too many matches')
+
+ return query[0]
+
+
class DataflowMetrics(MetricResults):
"""Implementation of MetricResults class for the Dataflow runner."""
@@ -51,18 +68,25 @@ class DataflowMetrics(MetricResults):
self._cached_metrics = None
self._job_graph = job_graph
+ @staticmethod
+ def _is_counter(metric_result):
+ return isinstance(metric_result.attempted, numbers.Number)
+
+ @staticmethod
+ def _is_distribution(metric_result):
+ return isinstance(metric_result.attempted, DistributionResult)
+
def _translate_step_name(self, internal_name):
"""Translate between internal step names (e.g. "s1") and user step names."""
if not self._job_graph:
raise ValueError('Could not translate the internal step name.')
try:
- [step] = [step
- for step in self._job_graph.proto.steps
- if step.name == internal_name]
- [user_step_name] = [prop.value.string_value
- for prop in step.properties.additionalProperties
- if prop.key == 'user_name']
+ step = _get_match(self._job_graph.proto.steps,
+ lambda x: x.name == internal_name)
+ user_step_name = _get_match(
+ step.properties.additionalProperties,
+ lambda x: x.key == 'user_name').value.string_value
except ValueError:
raise ValueError('Could not translate the internal step name.')
return user_step_name
@@ -77,19 +101,15 @@ class DataflowMetrics(MetricResults):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
- [step] = [prop.value
- for prop in metric.name.context.additionalProperties
- if prop.key == 'step']
+ step = _get_match(metric.name.context.additionalProperties,
+ lambda x: x.key == 'step').value
step = self._translate_step_name(step)
- [namespace] = [prop.value
- for prop in metric.name.context.additionalProperties
- if prop.key == 'namespace']
+ namespace = _get_match(metric.name.context.additionalProperties,
+ lambda x: x.key == 'namespace').value
name = metric.name.name
except ValueError:
- # An unstructured metric name is "step/namespace/name", but step names
- # can (and often do) contain slashes. Must only split on the right-most
- # two slashes, to preserve the full step name.
- [step, namespace, name] = metric.name.name.rsplit('/', 2)
+ return None
+
return MetricKey(step, MetricName(namespace, name))
def _populate_metric_results(self, response):
@@ -101,14 +121,17 @@ class DataflowMetrics(MetricResults):
# Get the tentative/committed versions of every metric together.
metrics_by_name = defaultdict(lambda: {})
for metric in user_metrics:
- if (metric.name.name.endswith('(DIST)') or
- metric.name.name.endswith('[MIN]') or
+ if (metric.name.name.endswith('[MIN]') or
metric.name.name.endswith('[MAX]') or
metric.name.name.endswith('[MEAN]') or
metric.name.name.endswith('[COUNT]')):
- warn('Distribution metrics will be ignored in the MetricsResult.query'
- 'method. You can see them in the Dataflow User Interface.')
- # Distributions are not yet fully supported in this runner
+ # The Dataflow Service presents distribution metrics in two ways:
+ # One way is as a single distribution object with all its fields, and
+ # another way is as four different scalar metrics labeled as [MIN],
+ # [MAX], [COUNT], [MEAN].
+ # TODO(pabloem) remove these when distributions are not being broken up
+ # in the service.
+ # The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [prop
for prop in metric.name.context.additionalProperties
@@ -116,22 +139,49 @@ class DataflowMetrics(MetricResults):
tentative_or_committed = 'tentative' if is_tentative else 'committed'
metric_key = self._get_metric_key(metric)
+ if metric_key is None:
+ continue
metrics_by_name[metric_key][tentative_or_committed] = metric
# Now we create the MetricResult elements.
result = []
for metric_key, metric in metrics_by_name.iteritems():
- if (metric['tentative'].scalar is None or
- metric['committed'].scalar is None):
+ attempted = self._get_metric_value(metric['tentative'])
+ committed = self._get_metric_value(metric['committed'])
+ if attempted is None or committed is None:
continue
- attempted = metric['tentative'].scalar.integer_value
- committed = metric['committed'].scalar.integer_value
result.append(MetricResult(metric_key,
attempted=attempted,
committed=committed))
return result
+ def _get_metric_value(self, metric):
+ """Get a metric result object from a MetricUpdate from Dataflow API."""
+ if metric is None:
+ return None
+
+ if metric.scalar is not None:
+ return metric.scalar.integer_value
+ elif metric.distribution is not None:
+ dist_count = _get_match(metric.distribution.object_value.properties,
+ lambda x: x.key == 'count').value.integer_value
+ dist_min = _get_match(metric.distribution.object_value.properties,
+ lambda x: x.key == 'min').value.integer_value
+ dist_max = _get_match(metric.distribution.object_value.properties,
+ lambda x: x.key == 'max').value.integer_value
+ dist_mean = _get_match(metric.distribution.object_value.properties,
+ lambda x: x.key == 'mean').value.integer_value
+ # Calculating dist_sum with a hack, as distribution sum is not yet
+ # available in the Dataflow API.
+ # TODO(pabloem) Switch to "sum" field once it's available in the API
+ dist_sum = dist_count * dist_mean
+ return DistributionResult(
+ DistributionData(
+ dist_sum, dist_count, dist_min, dist_max))
+ else:
+ return None
+
def _get_metrics_from_dataflow(self):
"""Return cached metrics or query the dataflow service."""
try:
@@ -152,7 +202,11 @@ class DataflowMetrics(MetricResults):
def query(self, filter=None):
response = self._get_metrics_from_dataflow()
- counters = self._populate_metric_results(response)
- # TODO(pabloem): Populate distributions once they are available.
- return {'counters': [c for c in counters if self.matches(filter, c.key)],
- 'distributions': []}
+ metric_results = self._populate_metric_results(response)
+ return {'counters': [elm for elm in metric_results
+ if self.matches(filter, elm.key)
+ and DataflowMetrics._is_counter(elm)],
+ 'distributions': [elm for elm in metric_results
+ if self.matches(filter, elm.key)
+ and DataflowMetrics._is_distribution(elm)],
+ 'gauges': []} # Gauges are not currently supported by dataflow
http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 51a186d..dd3cbe1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -23,6 +23,8 @@ import unittest
import mock
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metricbase import MetricName
@@ -43,7 +45,7 @@ class DictToObject(object):
class TestDataflowMetrics(unittest.TestCase):
- STRUCTURED_COUNTER_LIST = {"metrics": [
+ ONLY_COUNTERS_LIST = {"metrics": [
{"name": {"context":
{"additionalProperties": [
{"key": "namespace",
@@ -53,10 +55,11 @@ class TestDataflowMetrics(unittest.TestCase):
{"key": "tentative",
"value": "true"}]
},
- "name": "word_lengths",
+ "name": "words",
"origin": "user"
},
- "scalar": {"integer_value": 109475},
+ "scalar": {"integer_value": 26185},
+ "distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
{"name": {"context":
@@ -66,76 +69,137 @@ class TestDataflowMetrics(unittest.TestCase):
{"key": "step",
"value": "s2"}]
},
- "name": "word_lengths",
+ "name": "words",
"origin": "user"
},
- "scalar": {"integer_value": 109475},
+ "scalar": {"integer_value": 26181},
+ "distribution": None,
"updateTime": "2017-03-22T18:47:06.402Z"
},
- ]}
-
- BASIC_COUNTER_LIST = {"metrics": [
{"name": {"context":
- {"additionalProperties":[
- {"key": "original_name",
- "value": "user-split-split/__main__.WordExtractingDoFn/"
- "empty_lines_TentativeAggregateValue"},
- {"key": "step", "value": "split"}]},
- "name": "split/__main__.WordExtractingDoFn/empty_lines",
- "origin": "user"},
+ {"additionalProperties": [
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"},
+ {"key": "tentative",
+ "value": "true"}]
+ },
+ "name": "empty_lines",
+ "origin": "user"
+ },
"scalar": {"integer_value": 1080},
- "updateTime": "2017-02-23T01:13:36.659Z"},
+ "distribution": None,
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
{"name": {"context":
{"additionalProperties": [
- {"key": "original_name",
- "value": "user-split-split/__main__.WordExtractingDoFn/"
- "empty_lines_TentativeAggregateValue"},
- {"key": "step", "value": "split"},
- {"key": "tentative", "value": "true"}]},
- "name": "split/__main__.WordExtractingDoFn/empty_lines",
- "origin": "user"},
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"}]
+ },
+ "name": "empty_lines",
+ "origin": "user"
+ },
"scalar": {"integer_value": 1080},
- "updateTime": "2017-02-23T01:13:36.659Z"},
+ "distribution": None,
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
+ ]}
+ STRUCTURED_COUNTER_LIST = {"metrics": [
{"name": {"context":
{"additionalProperties": [
- {"key": "original_name",
- "value": "user-split-split/__main__.WordExtractingDoFn/"
- "words_TentativeAggregateValue"},
- {"key": "step", "value": "split"}]},
- "name": "longstepname/split/__main__.WordExtractingDoFn/words",
- "origin": "user"},
- "scalar": {"integer_value": 26181},
- "updateTime": "2017-02-23T01:13:36.659Z"},
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"},
+ {"key": "tentative",
+ "value": "true"}]
+ },
+ "name": "word_lengths",
+ "origin": "user"
+ },
+ "scalar": {"integer_value": 109475},
+ "distribution": None,
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
{"name": {"context":
{"additionalProperties": [
- {"key": "original_name",
- "value": "user-split-longstepname/split/"
- "__main__.WordExtractingDoFn/"
- "words_TentativeAggregateValue"},
- {"key": "step", "value": "split"},
- {"key": "tentative", "value": "true"}]},
- "name": "longstepname/split/__main__.WordExtractingDoFn/words",
- "origin": "user"},
- "scalar": {"integer_value": 26185},
- "updateTime": "2017-02-23T01:13:36.659Z"},
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"}]
+ },
+ "name": "word_lengths",
+ "origin": "user"
+ },
+ "scalar": {"integer_value": 109475},
+ "distribution": None,
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
{"name": {"context":
{"additionalProperties": [
- {"key": "original_name",
- "value": "user-split-split/__main__.WordExtractingDoFn/"
- "secretdistribution(DIST)"},
- {"key": "step", "value": "split"},
- {"key": "tentative", "value": "true"}]},
- "name":
- "split/__main__.WordExtractingDoFn/secretdistribution(DIST)",
- "origin": "user"},
- "scalar": {"integer_value": 15},
- "updateTime": "2017-02-23T01:13:36.659Z"}
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"},
+ {"key": "tentative",
+ "value": "true"}]
+ },
+ "name": "word_length_dist",
+ "origin": "user"
+ },
+ "scalar": None,
+ "distribution": {
+ "object_value": {
+ "properties": [
+ {"key": "min", "value":
+ {"integer_value": 2}},
+ {"key": "max", "value":
+ {"integer_value": 16}},
+ {"key": "count", "value":
+ {"integer_value": 2}},
+ {"key": "mean", "value":
+ {"integer_value": 9}},
+ {"key": "sum", "value":
+ {"integer_value": 18}},]
+ }
+ },
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
+ {"name": {"context":
+ {"additionalProperties": [
+ {"key": "namespace",
+ "value": "__main__.WordExtractingDoFn"},
+ {"key": "step",
+ "value": "s2"}]
+ },
+ "name": "word_length_dist",
+ "origin": "user"
+ },
+ "scalar": None,
+ "distribution": {
+ "object_value": {
+ "properties": [
+ {"key": "min", "value":
+ {"integer_value": 2}},
+ {"key": "max", "value":
+ {"integer_value": 16}},
+ {"key": "count", "value":
+ {"integer_value": 2}},
+ {"key": "mean", "value":
+ {"integer_value": 9}},
+ {"key": "sum", "value":
+ {"integer_value": 18}},
+ ]
+ }
+ },
+ "updateTime": "2017-03-22T18:47:06.402Z"
+ },
]}
def setup_mock_client_result(self, counter_list=None):
- if counter_list is None:
- counter_list = self.BASIC_COUNTER_LIST
-
mock_client = mock.Mock()
mock_query_result = DictToObject(counter_list)
mock_client.get_job_metrics.return_value = mock_query_result
@@ -145,7 +209,8 @@ class TestDataflowMetrics(unittest.TestCase):
return mock_client, mock_job_result
def test_cache_functions(self):
- mock_client, mock_job_result = self.setup_mock_client_result()
+ mock_client, mock_job_result = self.setup_mock_client_result(
+ self.STRUCTURED_COUNTER_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
# At first creation, we should always query dataflow.
@@ -160,7 +225,7 @@ class TestDataflowMetrics(unittest.TestCase):
dm.query()
self.assertTrue(dm._cached_metrics)
- def test_query_structured_counters(self):
+ def test_query_structured_metrics(self):
mock_client, mock_job_result = self.setup_mock_client_result(
self.STRUCTURED_COUNTER_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
@@ -175,9 +240,23 @@ class TestDataflowMetrics(unittest.TestCase):
]
self.assertEqual(query_result['counters'], expected_counters)
+ expected_distributions = [
+ MetricResult(
+ MetricKey('split',
+ MetricName('__main__.WordExtractingDoFn',
+ 'word_length_dist')),
+ DistributionResult(DistributionData(
+ 18, 2, 2, 16)),
+ DistributionResult(DistributionData(
+ 18, 2, 2, 16))),
+ ]
+ self.assertEqual(query_result['distributions'], expected_distributions)
+
def test_query_counters(self):
- mock_client, mock_job_result = self.setup_mock_client_result()
+ mock_client, mock_job_result = self.setup_mock_client_result(
+ self.ONLY_COUNTERS_LIST)
dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+ dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
query_result = dm.query()
expected_counters = [
MetricResult(
@@ -185,7 +264,7 @@ class TestDataflowMetrics(unittest.TestCase):
MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
1080, 1080),
MetricResult(
- MetricKey('longstepname/split',
+ MetricKey('split',
MetricName('__main__.WordExtractingDoFn', 'words')),
26181, 26185),
]
[2/2] beam git commit: This closes #2874
Posted by al...@apache.org.
This closes #2874
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da164a3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da164a3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da164a3f
Branch: refs/heads/master
Commit: da164a3fbc5d057ee5abacba3ec8a0fe05d006c8
Parents: 0490d6b 0afb140
Author: Ahmet Altay <al...@google.com>
Authored: Fri May 5 13:42:44 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri May 5 13:42:44 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/datastore_wordcount.py | 13 +-
sdks/python/apache_beam/examples/wordcount.py | 7 +-
sdks/python/apache_beam/metrics/cells.py | 16 +-
sdks/python/apache_beam/metrics/execution.py | 2 +-
.../runners/dataflow/dataflow_metrics.py | 114 ++++++++---
.../runners/dataflow/dataflow_metrics_test.py | 197 +++++++++++++------
6 files changed, 253 insertions(+), 96 deletions(-)
----------------------------------------------------------------------