You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/14 13:22:31 UTC

[beam] branch master updated: Merge pull request #7936 Add matchers for python MetricResults, labels to MetricResults, dataflow system metrics

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 41d0074  Merge pull request #7936 Add matchers for python MetricResults, labels to MetricResults, dataflow system metrics
41d0074 is described below

commit 41d0074c4579a363fe5809c455913b67c0f9d2b9
Author: Alex Amato <aj...@google.com>
AuthorDate: Thu Mar 14 06:22:18 2019 -0700

    Merge pull request #7936 Add matchers for python MetricResults, labels to MetricResults, dataflow system metrics
    
    Add matchers for python MetricResults, add labels to MetricResults and populate them for dataflow system metrics.
    
    These matchers can be used for integration testing.
---
 sdks/python/apache_beam/metrics/execution.py       |  27 ++-
 sdks/python/apache_beam/metrics/execution_test.py  |  48 ++++
 .../runners/dataflow/dataflow_metrics.py           |  61 +++--
 .../runners/dataflow/dataflow_metrics_test.py      | 150 +++++++++++-
 .../apache_beam/testing/metric_result_matchers.py  | 185 ++++++++++++++
 .../testing/metric_result_matchers_test.py         | 267 +++++++++++++++++++++
 6 files changed, 706 insertions(+), 32 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index ba190ff..57a0a09 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -48,36 +48,38 @@ from apache_beam.runners.worker import statesampler
 class MetricKey(object):
   """Key used to identify instance of metric cell.
 
-  Metrics are internally keyed by the name of the step they're associated with
-  and the name of the metric.
+  Metrics are internally keyed by the name of the step they're associated with,
+  the name and namespace (if it is a user defined metric) of the the metric,
+  and any extra label metadata added by the runner specific metric collection
+  service.
   """
-  def __init__(self, step, metric):
+  def __init__(self, step, metric, labels=None):
     """Initializes ``MetricKey``.
 
     Args:
       step: A string with the step this metric cell is part of.
-      metric: A ``MetricName`` that identifies a metric.
+      metric: A ``MetricName`` namespace+name that identifies a metric.
+      labels: An arbitrary set of labels that also identifies the metric.
     """
     self.step = step
     self.metric = metric
+    self.labels = labels if labels else dict()
 
   def __eq__(self, other):
     return (self.step == other.step and
-            self.metric == other.metric)
+            self.metric == other.metric and
+            self.labels == other.labels)
 
   def __ne__(self, other):
     # TODO(BEAM-5949): Needed for Python 2 compatibility.
     return not self == other
 
   def __hash__(self):
-    return hash((self.step, self.metric))
+    return hash((self.step, self.metric, frozenset(self.labels)))
 
   def __repr__(self):
-    return 'MetricKey(step={}, metric={})'.format(
-        self.step, self.metric)
-
-  def __hash__(self):
-    return hash((self.step, self.metric))
+    return 'MetricKey(step={}, metric={}, labels={})'.format(
+        self.step, self.metric, self.labels)
 
 
 class MetricResult(object):
@@ -122,6 +124,9 @@ class MetricResult(object):
     return 'MetricResult(key={}, committed={}, attempted={})'.format(
         self.key, str(self.committed), str(self.attempted))
 
+  def __str__(self):
+    return repr(self)
+
   @property
   def result(self):
     """Short-hand for falling back to attempted metrics if it seems that
diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py
index fbf5492..01c6615 100644
--- a/sdks/python/apache_beam/metrics/execution_test.py
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -21,10 +21,58 @@ import unittest
 from builtins import range
 
 from apache_beam.metrics.cells import CellCommitState
+from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.metricbase import MetricName
 
 
+class TestMetricKey(unittest.TestCase):
+  def test_equality_for_key_with_labels(self):
+    test_labels = {'label1', 'value1'}
+    test_object = MetricKey(
+        'step', MetricName('namespace', 'name'), labels=test_labels)
+    same_labels = MetricKey(
+        'step', MetricName('namespace', 'name'), labels={'label1', 'value1'})
+    same_label_reference = MetricKey(
+        'step', MetricName('namespace', 'name'), labels=test_labels)
+    self.assertEqual(test_object, same_labels)
+    self.assertEqual(test_object, same_label_reference)
+    self.assertEqual(hash(test_object), hash(same_labels))
+    self.assertEqual(hash(test_object), hash(same_label_reference))
+
+  def test_inequality_for_key_with_labels(self):
+    test_labels = {'label1', 'value1'}
+    test_object = MetricKey(
+        'step', MetricName('namespace', 'name'), labels=test_labels)
+    no_labels = MetricKey('step', MetricName('namespace', 'name'))
+    diff_label_key = MetricKey(
+        'step', MetricName('namespace', 'name'), labels={'l1_diff', 'value1'})
+    diff_label_value = MetricKey(
+        'step', MetricName('namespace', 'name'), labels={'label1', 'v1_diff'})
+    self.assertNotEqual(test_object, no_labels)
+    self.assertNotEqual(test_object, diff_label_key)
+    self.assertNotEqual(test_object, diff_label_value)
+    self.assertNotEqual(hash(test_object), hash(no_labels))
+    self.assertNotEqual(hash(test_object), hash(diff_label_key))
+    self.assertNotEqual(hash(test_object), hash(diff_label_value))
+
+  def test_equality_for_key_with_no_labels(self):
+    test_object = MetricKey('step', MetricName('namespace', 'name'))
+    same = MetricKey('step', MetricName('namespace', 'name'))
+    self.assertEqual(test_object, same)
+    self.assertEqual(hash(test_object), hash(same))
+
+    diff_step = MetricKey('step_diff', MetricName('namespace', 'name'))
+    diff_namespace = MetricKey('step', MetricName('namespace_diff', 'name'))
+    diff_name = MetricKey('step', MetricName('namespace', 'name_diff'))
+    self.assertNotEqual(test_object, diff_step)
+    self.assertNotEqual(test_object, diff_namespace)
+    self.assertNotEqual(test_object, diff_name)
+    self.assertNotEqual(hash(test_object), hash(diff_step))
+    self.assertNotEqual(hash(test_object), hash(diff_namespace))
+    self.assertNotEqual(hash(test_object), hash(diff_name))
+
+
 class TestMetricsContainer(unittest.TestCase):
   def test_create_new_counter(self):
     mc = MetricsContainer('astep')
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 6d86125..8a90cd6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -51,6 +51,11 @@ def _get_match(proto, filter_fn):
   return query[0]
 
 
+# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
+STRUCTURED_NAME_LABELS = [
+    'execution_step', 'original_name', 'output_user_name', 'step']
+
+
 class DataflowMetrics(MetricResults):
   """Implementation of MetricResults class for the Dataflow runner."""
 
@@ -97,7 +102,9 @@ class DataflowMetrics(MetricResults):
 
   def _get_metric_key(self, metric):
     """Populate the MetricKey object for a queried metric result."""
-    try:
+    step = ""
+    name = metric.name.name # Always extract a name
+    try: # Try to extract the user step name.
       # If ValueError is thrown within this try-block, it is because of
       # one of the following:
       # 1. Unable to translate the step name. Only happening with improperly
@@ -108,23 +115,39 @@ class DataflowMetrics(MetricResults):
       step = _get_match(metric.name.context.additionalProperties,
                         lambda x: x.key == 'step').value
       step = self._translate_step_name(step)
+    except ValueError:
+      pass
+
+    namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
+    try:
       namespace = _get_match(metric.name.context.additionalProperties,
                              lambda x: x.key == 'namespace').value
-      name = metric.name.name
     except ValueError:
-      return None
-
-    return MetricKey(step, MetricName(namespace, name))
-
-  def _populate_metric_results(self, response):
-    """Take a list of metrics, and convert it to a list of MetricResult."""
-    user_metrics = [metric
-                    for metric in response.metrics
-                    if metric.name.origin == 'user']
+      pass
+
+    labels = dict()
+    for kv in metric.name.context.additionalProperties:
+      if kv.key in STRUCTURED_NAME_LABELS:
+        labels[kv.key] = kv.value
+    # Package everything besides namespace and name the labels as well,
+    # including unmodified step names to assist in integration the exact
+    # unmodified values which come from dataflow.
+    return MetricKey(step, MetricName(namespace, name), labels=labels)
+
+  def _populate_metrics(self, response, result, user_metrics=False):
+    """Move metrics from response to results as MetricResults."""
+    if user_metrics:
+      metrics = [metric
+                 for metric in response.metrics
+                 if metric.name.origin == 'user']
+    else:
+      metrics = [metric
+                 for metric in response.metrics
+                 if metric.name.origin == 'dataflow/v1b3']
 
     # Get the tentative/committed versions of every metric together.
     metrics_by_name = defaultdict(lambda: {})
-    for metric in user_metrics:
+    for metric in metrics:
       if (metric.name.name.endswith('[MIN]') or
           metric.name.name.endswith('[MAX]') or
           metric.name.name.endswith('[MEAN]') or
@@ -148,7 +171,6 @@ class DataflowMetrics(MetricResults):
       metrics_by_name[metric_key][tentative_or_committed] = metric
 
     # Now we create the MetricResult elements.
-    result = []
     for metric_key, metric in iteritems(metrics_by_name):
       attempted = self._get_metric_value(metric['tentative'])
       committed = self._get_metric_value(metric['committed'])
@@ -158,8 +180,6 @@ class DataflowMetrics(MetricResults):
                                  attempted=attempted,
                                  committed=committed))
 
-    return result
-
   def _get_metric_value(self, metric):
     """Get a metric result object from a MetricUpdate from Dataflow API."""
     if metric is None:
@@ -200,9 +220,18 @@ class DataflowMetrics(MetricResults):
       self._cached_metrics = job_metrics
     return job_metrics
 
+  def all_metrics(self):
+    """Return all user and system metrics from the dataflow service."""
+    metric_results = []
+    response = self._get_metrics_from_dataflow()
+    self._populate_metrics(response, metric_results, user_metrics=True)
+    self._populate_metrics(response, metric_results, user_metrics=False)
+    return metric_results
+
   def query(self, filter=None):
+    metric_results = []
     response = self._get_metrics_from_dataflow()
-    metric_results = self._populate_metric_results(response)
+    self._populate_metrics(response, metric_results, user_metrics=True)
     return {self.COUNTERS: [elm for elm in metric_results
                             if self.matches(filter, elm.key)
                             and DataflowMetrics._is_counter(elm)],
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 1823b59..cdce02c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -33,6 +33,8 @@ from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.runners.dataflow import dataflow_metrics
+from apache_beam.testing import metric_result_matchers
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
 
 
 class DictToObject(object):
@@ -49,6 +51,8 @@ class DictToObject(object):
 
 class TestDataflowMetrics(unittest.TestCase):
 
+  # TODO(BEAM-6734): Write a dump tool to generate this fake data, or
+  # somehow make this easier to maintain.
   ONLY_COUNTERS_LIST = {"metrics": [
       {"name": {"context":
                 {"additionalProperties": [
@@ -202,6 +206,103 @@ class TestDataflowMetrics(unittest.TestCase):
        "updateTime": "2017-03-22T18:47:06.402Z"
       },
   ]}
+  SYSTEM_COUNTERS_LIST = {"metrics": [
+      # ElementCount
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value":  "ToIsmRecordForMultimap-out0-ElementCount"},
+                    {"key": "output_user_name",
+                     "value": "ToIsmRecordForMultimap-out0"}
+                    ]
+                },
+                "name": "ElementCount",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 42},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value":  "ToIsmRecordForMultimap-out0-ElementCount"},
+                    {"key": "output_user_name",
+                     "value": "ToIsmRecordForMultimap-out0"},
+                    {"key": "tentative",
+                     "value": "true"}
+                    ]
+                },
+                "name": "ElementCount",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 42},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      # MeanByteCount
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value":  "Read-out0-MeanByteCount"},
+                    {"key": "output_user_name",
+                     "value": "GroupByKey/Read-out0"}
+                    ]
+                },
+                "name": "MeanByteCount",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 31},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value":  "Read-out0-MeanByteCount"},
+                    {"key": "output_user_name",
+                     "value": "GroupByKey/Read-out0"},
+                    {"key": "tentative",
+                     "value": "true"}
+                    ]
+                },
+                "name": "MeanByteCount",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 31},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      # ExecutionTime
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "step",
+                     "value":  "write/Write/Write"},
+                    ]
+                },
+                "name": "ExecutionTime_ProcessElement",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 1000},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "step",
+                     "value":  "write/Write/Write"},
+                    {"key": "tentative",
+                     "value": "true"}
+                    ]
+                },
+                "name": "ExecutionTime_ProcessElement",
+                "origin": "dataflow/v1b3"
+               },
+       "scalar": {"integer_value": 1000},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+  ]}
 
   def setup_mock_client_result(self, counter_list=None):
     mock_client = mock.Mock()
@@ -238,8 +339,8 @@ class TestDataflowMetrics(unittest.TestCase):
     expected_counters = [
         MetricResult(
             MetricKey('split',
-                      MetricName('__main__.WordExtractingDoFn',
-                                 'word_lengths')),
+                      MetricName('__main__.WordExtractingDoFn', 'word_lengths'),
+                      labels={'step': 's2'}),
             109475, 109475),
         ]
     self.assertEqual(query_result['counters'], expected_counters)
@@ -248,7 +349,8 @@ class TestDataflowMetrics(unittest.TestCase):
         MetricResult(
             MetricKey('split',
                       MetricName('__main__.WordExtractingDoFn',
-                                 'word_length_dist')),
+                                 'word_length_dist'),
+                      labels={'step': 's2'}),
             DistributionResult(DistributionData(
                 18, 2, 2, 16)),
             DistributionResult(DistributionData(
@@ -265,11 +367,14 @@ class TestDataflowMetrics(unittest.TestCase):
     expected_counters = [
         MetricResult(
             MetricKey('split',
-                      MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
+                      MetricName('__main__.WordExtractingDoFn', 'empty_lines'),
+                      labels={'step': 's2'}
+                     ),
             1080, 1080),
         MetricResult(
             MetricKey('split',
-                      MetricName('__main__.WordExtractingDoFn', 'words')),
+                      MetricName('__main__.WordExtractingDoFn', 'words'),
+                      labels={'step': 's2'}),
             26181, 26185),
         ]
     self.assertEqual(sorted(query_result['counters'],
@@ -277,6 +382,41 @@ class TestDataflowMetrics(unittest.TestCase):
                      sorted(expected_counters,
                             key=lambda x: x.key.metric.name))
 
+  def test_system_counters_set_labels_and_step_name(self):
+    mock_client, mock_job_result = self.setup_mock_client_result(
+        self.SYSTEM_COUNTERS_LIST)
+    test_object = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+    all_metrics = test_object.all_metrics()
+
+    matchers = [
+        MetricResultMatcher(
+            name='ElementCount',
+            labels={
+                'original_name' : 'ToIsmRecordForMultimap-out0-ElementCount',
+                'output_user_name' : 'ToIsmRecordForMultimap-out0'
+            },
+            attempted=42,
+            committed=42
+        ),
+        MetricResultMatcher(
+            name='MeanByteCount',
+            labels={
+                'original_name' : 'Read-out0-MeanByteCount',
+                'output_user_name' : 'GroupByKey/Read-out0'
+            },
+            attempted=31,
+            committed=31
+        ),
+        MetricResultMatcher(
+            name='ExecutionTime_ProcessElement',
+            step='write/Write/Write',
+            attempted=1000,
+            committed=1000
+        )
+    ]
+    errors = metric_result_matchers.verify_all(all_metrics, matchers)
+    self.assertFalse(errors, errors)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/testing/metric_result_matchers.py b/sdks/python/apache_beam/testing/metric_result_matchers.py
new file mode 100644
index 0000000..126ba3d
--- /dev/null
+++ b/sdks/python/apache_beam/testing/metric_result_matchers.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""MetricResult matchers for validating metrics in PipelineResults.
+
+example usage:
+::
+
+    result = my_pipeline.run()
+    all_metrics = result.metrics().all_metrics()
+
+    matchers = [
+      MetricResultMatcher(
+          namespace='myNamespace',
+          name='myName',
+          step='myStep',
+          labels={
+              'pcollection': 'myCollection',
+              'myCustomKey': 'myCustomValue'
+          },
+          attempted=42,
+          committed=42
+      )
+    ]
+    errors = metric_result_matchers.verify_all(all_metrics, matchers)
+    self.assertFalse(errors, errors)
+
+"""
+
+from __future__ import absolute_import
+
+from hamcrest import equal_to
+from hamcrest.core import string_description
+from hamcrest.core.base_matcher import BaseMatcher
+from hamcrest.core.matcher import Matcher
+
+from apache_beam.metrics.cells import DistributionResult
+
+
+def _matcher_or_equal_to(value_or_matcher):
+  """Pass-thru for matchers, and wraps value inputs in an equal_to matcher."""
+  if value_or_matcher is None:
+    return None
+  if isinstance(value_or_matcher, Matcher):
+    return value_or_matcher
+  return equal_to(value_or_matcher)
+
+
+class MetricResultMatcher(BaseMatcher):
+  """A PyHamcrest matcher that validates counter MetricResults."""
+
+  def __init__(self, namespace=None, name=None, step=None, labels=None,
+               attempted=None, committed=None, sum_value=None, count_value=None,
+               min_value=None, max_value=None,):
+    self.namespace = _matcher_or_equal_to(namespace)
+    self.name = _matcher_or_equal_to(name)
+    self.step = _matcher_or_equal_to(step)
+    self.attempted = _matcher_or_equal_to(attempted)
+    self.committed = _matcher_or_equal_to(committed)
+    labels = labels or dict()
+    self.label_matchers = dict()
+    for (k, v) in labels.items():
+      self.label_matchers[_matcher_or_equal_to(k)] = _matcher_or_equal_to(v)
+
+  def _matches(self, metric_result):
+    if self.namespace is not None and not self.namespace.matches(
+        metric_result.key.metric.namespace):
+      return False
+    if self.name and not self.name.matches(metric_result.key.metric.name):
+      return False
+    if self.step and not self.step.matches(metric_result.key.step):
+      return False
+    if (self.attempted is not None and
+        not self.attempted.matches(metric_result.attempted)):
+      return False
+    if (self.committed is not None and
+        not self.committed.matches(metric_result.committed)):
+      return False
+    for (k_matcher, v_matcher) in self.label_matchers.items():
+      matched_keys = [key for key in metric_result.key.labels.keys() if
+                      k_matcher.matches(key)]
+      matched_key = matched_keys[0] if matched_keys else None
+      if not matched_key:
+        return False
+      label_value = metric_result.key.labels[matched_key]
+      if not v_matcher.matches(label_value):
+        return False
+    return True
+
+  def describe_to(self, description):
+    if self.namespace:
+      description.append_text(' namespace: ')
+      self.namespace.describe_to(description)
+    if self.name:
+      description.append_text(' name: ')
+      self.name.describe_to(description)
+    if self.step:
+      description.append_text(' step: ')
+      self.step.describe_to(description)
+    for (k_matcher, v_matcher) in self.label_matchers.items():
+      description.append_text(' (label_key: ')
+      k_matcher.describe_to(description)
+      description.append_text(' label_value: ')
+      v_matcher.describe_to(description)
+      description.append_text('). ')
+    if self.attempted is not None:
+      description.append_text(' attempted: ')
+      self.attempted.describe_to(description)
+    if self.committed is not None:
+      description.append_text(' committed: ')
+      self.committed.describe_to(description)
+
+  def describe_mismatch(self, metric_result, mismatch_description):
+    mismatch_description.append_text("was").append_value(metric_result)
+
+
+class DistributionMatcher(BaseMatcher):
+  """A PyHamcrest matcher that validates counter distributions."""
+
+  def __init__(self, sum_value=None, count_value=None, min_value=None,
+               max_value=None):
+    self.sum_value = _matcher_or_equal_to(sum_value)
+    self.count_value = _matcher_or_equal_to(count_value)
+    self.min_value = _matcher_or_equal_to(min_value)
+    self.max_value = _matcher_or_equal_to(max_value)
+
+  def _matches(self, distribution_result):
+    if not isinstance(distribution_result, DistributionResult):
+      return False
+    if self.sum_value and not self.sum_value.matches(distribution_result.sum):
+      return False
+    if self.count_value and not self.count_value.matches(
+        distribution_result.count):
+      return False
+    if self.min_value and not self.min_value.matches(distribution_result.min):
+      return False
+    if self.max_value and not self.max_value.matches(distribution_result.max):
+      return False
+    return True
+
+  def describe_to(self, description):
+    if self.sum_value:
+      description.append_text(' sum_value: ')
+      self.sum_value.describe_to(description)
+    if self.count_value:
+      description.append_text(' count_value: ')
+      self.count_value.describe_to(description)
+    if self.min_value:
+      description.append_text(' min_value: ')
+      self.min_value.describe_to(description)
+    if self.max_value:
+      description.append_text(' max_value: ')
+      self.max_value.describe_to(description)
+
+  def describe_mismatch(self, distribution_result, mismatch_description):
+    mismatch_description.append_text('was').append_value(distribution_result)
+
+
+def verify_all(all_metrics, matchers):
+  """Verified that every matcher matches a metric result in all_metrics."""
+  errors = []
+  matched_metrics = []
+  for matcher in matchers:
+    matched_metrics = [mr for mr in all_metrics if matcher.matches(mr)]
+    if not matched_metrics:
+      errors.append('Unable to match metrics for matcher %s' % (
+          string_description.tostring(matcher)))
+  if errors:
+    errors.append('\nActual MetricResults:\n' +
+                  '\n'.join([str(mr) for mr in all_metrics]))
+  return ''.join(errors)
diff --git a/sdks/python/apache_beam/testing/metric_result_matchers_test.py b/sdks/python/apache_beam/testing/metric_result_matchers_test.py
new file mode 100644
index 0000000..dc674ac
--- /dev/null
+++ b/sdks/python/apache_beam/testing/metric_result_matchers_test.py
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the metric_result_matchers."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from hamcrest import assert_that as hc_assert_that
+from hamcrest import anything
+from hamcrest import equal_to
+from hamcrest.core.core.isnot import is_not
+from hamcrest.library.number.ordering_comparison import greater_than
+from hamcrest.library.text.isequal_ignoring_case import equal_to_ignoring_case
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.metricbase import MetricName
+from apache_beam.testing.metric_result_matchers import DistributionMatcher
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
+
+EVERYTHING_DISTRIBUTION = {
+    'namespace': 'myNamespace',
+    'name': 'myName',
+    'step': 'myStep',
+    'attempted': {
+        'distribution': {
+            'sum': 12,
+            'count': 5,
+            'min': 0,
+            'max': 6,
+        }
+    },
+    'committed': {
+        'distribution': {
+            'sum': 12,
+            'count': 5,
+            'min': 0,
+            'max': 6,
+        }
+    },
+    'labels' : {
+        'pcollection': 'myCollection',
+        'myCustomKey': 'myCustomValue'
+    }
+}
+
+EVERYTHING_COUNTER = {
+    'namespace': 'myNamespace',
+    'name': 'myName',
+    'step': 'myStep',
+    'attempted': {
+        'counter': 42
+    },
+    'committed': {
+        'counter': 42
+    },
+    'labels': {
+        'pcollection': 'myCollection',
+        'myCustomKey': 'myCustomValue'
+    }
+}
+
+
+def _create_metric_result(data_dict):
+  step = data_dict['step'] if 'step' in data_dict else ''
+  labels = data_dict['labels'] if 'labels' in data_dict else dict()
+  values = {}
+  for key in ['attempted', 'committed']:
+    if key in data_dict:
+      if 'counter' in data_dict[key]:
+        values[key] = data_dict[key]['counter']
+      elif 'distribution' in data_dict[key]:
+        distribution = data_dict[key]['distribution']
+        values[key] = DistributionResult(DistributionData(
+            distribution['sum'],
+            distribution['count'],
+            distribution['min'],
+            distribution['max'],
+        ))
+  attempted = values['attempted'] if 'attempted' in values else None
+  committed = values['committed'] if 'committed' in values else None
+
+  metric_name = MetricName(data_dict['namespace'], data_dict['name'])
+  metric_key = MetricKey(step, metric_name, labels)
+  return MetricResult(metric_key, committed, attempted)
+
+
+class MetricResultMatchersTest(unittest.TestCase):
+
+  def test_matches_all_for_counter(self):
+    metric_result = _create_metric_result(EVERYTHING_COUNTER)
+    matcher = MetricResultMatcher(
+        namespace='myNamespace',
+        name='myName',
+        step='myStep',
+        labels={
+            'pcollection': 'myCollection',
+            'myCustomKey': 'myCustomValue'
+        },
+        attempted=42,
+        committed=42
+    )
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_none_for_counter(self):
+    metric_result = _create_metric_result(EVERYTHING_COUNTER)
+    matcher = MetricResultMatcher(
+        namespace=is_not(equal_to('invalidNamespace')),
+        name=is_not(equal_to('invalidName')),
+        step=is_not(equal_to('invalidStep')),
+        labels={
+            is_not(equal_to('invalidPcollection')): anything(),
+            is_not(equal_to('invalidCustomKey')): is_not(equal_to(
+                'invalidCustomValue'))
+        },
+        attempted=is_not(equal_to(1000)),
+        committed=is_not(equal_to(1000)))
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_all_for_distribution(self):
+    metric_result = _create_metric_result(EVERYTHING_DISTRIBUTION)
+    matcher = MetricResultMatcher(
+        namespace='myNamespace',
+        name='myName',
+        step='myStep',
+        labels={
+            'pcollection': 'myCollection',
+            'myCustomKey': 'myCustomValue'
+        },
+        committed=DistributionMatcher(
+            sum_value=12,
+            count_value=5,
+            min_value=0,
+            max_value=6
+        ),
+        attempted=DistributionMatcher(
+            sum_value=12,
+            count_value=5,
+            min_value=0,
+            max_value=6
+        ),
+    )
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_none_for_distribution(self):
+    metric_result = _create_metric_result(EVERYTHING_DISTRIBUTION)
+    matcher = MetricResultMatcher(
+        namespace=is_not(equal_to('invalidNamespace')),
+        name=is_not(equal_to('invalidName')),
+        step=is_not(equal_to('invalidStep')),
+        labels={
+            is_not(equal_to('invalidPcollection')): anything(),
+            is_not(equal_to('invalidCustomKey')): is_not(equal_to(
+                'invalidCustomValue'))
+        },
+        committed=is_not(DistributionMatcher(
+            sum_value=120,
+            count_value=50,
+            min_value=100,
+            max_value=60
+        )),
+        attempted=is_not(DistributionMatcher(
+            sum_value=120,
+            count_value=50,
+            min_value=100,
+            max_value=60
+        )),
+    )
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_key_but_not_value(self):
+    metric_result = _create_metric_result(EVERYTHING_COUNTER)
+    matcher = is_not(MetricResultMatcher(
+        labels={
+            'pcollection': 'invalidCollection'
+        }))
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_counter_with_custom_matchers(self):
+    metric_result = _create_metric_result(EVERYTHING_COUNTER)
+    matcher = is_not(MetricResultMatcher(
+        namespace=equal_to_ignoring_case('MYNAMESPACE'),
+        name=equal_to_ignoring_case('MYNAME'),
+        step=equal_to_ignoring_case('MYSTEP'),
+        labels={
+            equal_to_ignoring_case('PCOLLECTION') :
+                equal_to_ignoring_case('MYCUSTOMVALUE'),
+            'myCustomKey': equal_to_ignoring_case('MYCUSTOMVALUE')
+        },
+        committed=greater_than(0),
+        attempted=greater_than(0)
+    ))
+    hc_assert_that(metric_result, matcher)
+
+  def test_matches_distribution_with_custom_matchers(self):
+    metric_result = _create_metric_result(EVERYTHING_DISTRIBUTION)
+    matcher = is_not(MetricResultMatcher(
+        namespace=equal_to_ignoring_case('MYNAMESPACE'),
+        name=equal_to_ignoring_case('MYNAME'),
+        step=equal_to_ignoring_case('MYSTEP'),
+        labels={
+            equal_to_ignoring_case('PCOLLECTION') :
+                equal_to_ignoring_case('MYCUSTOMVALUE'),
+            'myCustomKey': equal_to_ignoring_case('MYCUSTOMVALUE')
+        },
+        committed=is_not(DistributionMatcher(
+            sum_value=greater_than(-1),
+            count_value=greater_than(-1),
+            min_value=greater_than(-1),
+            max_value=greater_than(-1)
+        )),
+        attempted=is_not(DistributionMatcher(
+            sum_value=greater_than(-1),
+            count_value=greater_than(-1),
+            min_value=greater_than(-1),
+            max_value=greater_than(-1)
+        )),
+    ))
+    hc_assert_that(metric_result, matcher)
+
+  def test_counter_does_not_match_distribution_and_doesnt_crash(self):
+    metric_result = _create_metric_result(EVERYTHING_COUNTER)
+    matcher = is_not(MetricResultMatcher(
+        committed=DistributionMatcher(
+            sum_value=120,
+            count_value=50,
+            min_value=100,
+            max_value=60
+        ),
+        attempted=DistributionMatcher(
+            sum_value=120,
+            count_value=50,
+            min_value=100,
+            max_value=60
+        ),
+    ))
+    hc_assert_that(metric_result, matcher)
+
+  def test_distribution_does_not_match_counter_and_doesnt_crash(self):
+    metric_result = _create_metric_result(EVERYTHING_DISTRIBUTION)
+    matcher = is_not(MetricResultMatcher(
+        attempted=42,
+        committed=42
+    ))
+    hc_assert_that(metric_result, matcher)
+
+
+if __name__ == '__main__':
+  unittest.main()