You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/11/18 22:29:07 UTC

[beam] branch master updated: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 53a26ae  [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
     new 7c28db2  Merge pull request #13217 from [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
53a26ae is described below

commit 53a26aee3f35b6dabb62661a7186217fc5c4442c
Author: Alex Amato <aj...@google.com>
AuthorDate: Tue Nov 17 11:00:16 2020 -0800

    [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
---
 sdks/python/apache_beam/internal/metrics/metric.py | 90 ++++++++++++++++++++++
 .../apache_beam/internal/metrics/metric_test.py    | 41 ++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 36 ++++++++-
 .../gcp/resource_identifiers.py}                   | 34 +++-----
 sdks/python/apache_beam/metrics/cells.py           | 22 ++++--
 sdks/python/apache_beam/metrics/execution.pxd      |  4 +-
 sdks/python/apache_beam/metrics/execution.py       | 71 +++++++++++------
 sdks/python/apache_beam/metrics/metric.py          | 18 +++--
 sdks/python/apache_beam/metrics/metricbase.py      | 42 +++++++---
 .../python/apache_beam/metrics/monitoring_infos.py | 21 ++++-
 .../apache_beam/metrics/monitoring_infos_test.py   | 35 +++++++++
 .../apache_beam/runners/worker/sdk_worker.py       | 38 ++++++---
 .../apache_beam/runners/worker/sdk_worker_test.py  | 60 +++++++++++++++
 13 files changed, 426 insertions(+), 86 deletions(-)

diff --git a/sdks/python/apache_beam/internal/metrics/metric.py b/sdks/python/apache_beam/internal/metrics/metric.py
index 2fbb963..069919e 100644
--- a/sdks/python/apache_beam/internal/metrics/metric.py
+++ b/sdks/python/apache_beam/internal/metrics/metric.py
@@ -39,6 +39,7 @@ from typing import Type
 from typing import Union
 
 from apache_beam.internal.metrics.cells import HistogramCellFactory
+from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricUpdater
 from apache_beam.metrics.metric import Metrics as UserMetrics
 from apache_beam.metrics.metricbase import Histogram
@@ -49,6 +50,13 @@ if TYPE_CHECKING:
   from apache_beam.metrics.cells import MetricCellFactory
   from apache_beam.utils.histogram import BucketType
 
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  pass
+
 __all__ = ['Metrics']
 
 _LOGGER = logging.getLogger(__name__)
@@ -56,6 +64,27 @@ _LOGGER = logging.getLogger(__name__)
 
 class Metrics(object):
   @staticmethod
+  def counter(urn, labels=None, process_wide=False):
+    # type: (str, Optional[Dict[str, str]], bool) -> UserMetrics.DelegatingCounter
+
+    """Obtains or creates a Counter metric.
+
+    Args:
+      namespace: A class or string that gives the namespace to a metric
+      name: A string that gives a unique name to a metric
+      urn: URN to populate on a MonitoringInfo, when sending to RunnerHarness.
+      labels: Labels to populate on a MonitoringInfo
+      process_wide: Whether or not the metric is specific to the current bundle
+          or should be calculated for the entire process.
+
+    Returns:
+      A Counter object.
+    """
+    return UserMetrics.DelegatingCounter(
+        MetricName(namespace=None, name=None, urn=urn, labels=labels),
+        process_wide=process_wide)
+
+  @staticmethod
   def histogram(namespace, name, bucket_type, logger=None):
     # type: (Union[Type, str], str, BucketType, Optional[MetricLogger]) -> Metrics.DelegatingHistogram
 
@@ -136,3 +165,64 @@ class MetricLogger(object):
           self._last_logging_millis = current_millis
       finally:
         self._lock.release()
+
+
+class ServiceCallMetric(object):
+  """Metric class which records Service API call metrics.
+
+  This class will capture a request count metric for the specified
+  request_count_urn and base_labels.
+
+  When call() is invoked the status must be provided, which will
+  be converted to a canonical GCP status code, if possible.
+
+  TODO(ajamato): Add Request latency metric.
+  """
+  def __init__(self, request_count_urn, base_labels=None):
+    # type: (str, Optional[Dict[str, str]]) -> None
+    self.base_labels = base_labels if base_labels else {}
+    self.request_count_urn = request_count_urn
+
+  def call(self, status):
+    # type: (Union[int, str, HttpError]) -> None
+
+    """Record the status of the call into appropriate metrics."""
+    canonical_status = self.convert_to_canonical_status_string(status)
+    additional_labels = {monitoring_infos.STATUS_LABEL: canonical_status}
+
+    labels = dict(
+        list(self.base_labels.items()) + list(additional_labels.items()))
+
+    request_counter = Metrics.counter(
+        urn=self.request_count_urn, labels=labels, process_wide=True)
+    request_counter.inc()
+
+  def convert_to_canonical_status_string(self, status):
+    # type: (Union[int, str, HttpError]) -> str
+
+    """Converts a status to a canonical GCP status cdoe string."""
+    http_status_code = None
+    if isinstance(status, int):
+      http_status_code = status
+    elif isinstance(status, str):
+      return status.lower()
+    elif isinstance(status, HttpError):
+      http_status_code = int(status.status_code)
+    http_to_canonical_gcp_status = {
+        200: 'ok',
+        400: 'out_of_range',
+        401: 'unauthenticated',
+        403: 'permission_denied',
+        404: 'not_found',
+        409: 'already_exists',
+        429: 'resource_exhausted',
+        499: 'cancelled',
+        500: 'internal',
+        501: 'not_implemented',
+        503: 'unavailable',
+        504: 'deadline_exceeded'
+    }
+    if (http_status_code is not None and
+        http_status_code in http_to_canonical_gcp_status):
+      return http_to_canonical_gcp_status[http_status_code]
+    return str(http_status_code)
diff --git a/sdks/python/apache_beam/internal/metrics/metric_test.py b/sdks/python/apache_beam/internal/metrics/metric_test.py
index 88c2903..bef7713 100644
--- a/sdks/python/apache_beam/internal/metrics/metric_test.py
+++ b/sdks/python/apache_beam/internal/metrics/metric_test.py
@@ -24,9 +24,14 @@ import unittest
 from mock import patch
 
 from apache_beam.internal.metrics.cells import HistogramCellFactory
+from apache_beam.internal.metrics.metric import Metrics as InternalMetrics
 from apache_beam.internal.metrics.metric import MetricLogger
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metric import Metrics
 from apache_beam.metrics.metricbase import MetricName
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils import counters
 from apache_beam.utils.histogram import LinearBucket
 
 
@@ -48,5 +53,41 @@ class MetricLoggerTest(unittest.TestCase):
         Contains('HistogramData(Total count: 1, P99: 2, P90: 2, P50: 2)'))
 
 
+class MetricsTest(unittest.TestCase):
+  def test_create_process_wide(self):
+    sampler = statesampler.StateSampler('', counters.CounterFactory())
+    statesampler.set_current_tracker(sampler)
+    state1 = sampler.scoped_state(
+        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+
+    try:
+      sampler.start()
+      with state1:
+        urn = "my:custom:urn"
+        labels = {'key': 'value'}
+        counter = InternalMetrics.counter(
+            urn=urn, labels=labels, process_wide=True)
+        # Test that if process_wide is set, that it will be set
+        # on the process_wide container.
+        counter.inc(10)
+        self.assertTrue(isinstance(counter, Metrics.DelegatingCounter))
+
+        del counter
+
+        metric_name = MetricName(None, None, urn=urn, labels=labels)
+        # Expect a value set on the current container.
+        self.assertEqual(
+            MetricsEnvironment.process_wide_container().get_counter(
+                metric_name).get_cumulative(),
+            10)
+        # Expect no value set on the current container.
+        self.assertEqual(
+            MetricsEnvironment.current_container().get_counter(
+                metric_name).get_cumulative(),
+            0)
+    finally:
+      sampler.stop()
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 76d60ec..da1b7fd 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -52,9 +52,12 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.internal.metrics.metric import MetricLogger
 from apache_beam.internal.metrics.metric import Metrics
+from apache_beam.internal.metrics.metric import ServiceCallMetric
 from apache_beam.io.gcp import bigquery_avro_tools
+from apache_beam.io.gcp import resource_identifiers
 from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
 from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.metrics import monitoring_infos
 from apache_beam.options import value_provider
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -566,14 +569,43 @@ class BigQueryWrapper(object):
             skipInvalidRows=skip_invalid_rows,
             # TODO(silviuc): Should have an option for ignoreUnknownValues?
             rows=rows))
+
+    resource = resource_identifiers.BigQueryTable(
+        project_id, dataset_id, table_id)
+
+    labels = {
+        # TODO(ajamato): Add Ptransform label.
+        monitoring_infos.SERVICE_LABEL: 'BigQuery',
+        # Refer to any method which writes elements to BigQuery in batches
+        # as "BigQueryBatchWrite". I.e. storage API's insertAll, or future
+        # APIs introduced.
+        monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id,
+        monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id,
+        monitoring_infos.BIGQUERY_TABLE_LABEL: table_id,
+    }
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+
     started_millis = int(time.time() * 1000)
+    response = None
     try:
       response = self.client.tabledata.InsertAll(request)
-      # response.insertErrors is not [] if errors encountered.
+      if not response.insertErrors:
+        service_call_metric.call('ok')
+      for insert_error in response.insertErrors:
+        for error in insert_error.errors:
+          service_call_metric.call(error.reason)
+    except HttpError as e:
+      service_call_metric.call(e)
     finally:
       self._latency_histogram_metric.update(
           int(time.time() * 1000) - started_millis)
-    return not response.insertErrors, response.insertErrors
+    if response:
+      return not response.insertErrors, response.insertErrors
+    return False, []
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/io/gcp/resource_identifiers.py
similarity index 56%
copy from sdks/python/apache_beam/metrics/execution.pxd
copy to sdks/python/apache_beam/io/gcp/resource_identifiers.py
index 6e1cbb0..fb6ebbe 100644
--- a/sdks/python/apache_beam/metrics/execution.pxd
+++ b/sdks/python/apache_beam/io/gcp/resource_identifiers.py
@@ -15,31 +15,21 @@
 # limitations under the License.
 #
 
-cimport cython
-cimport libc.stdint
+"""Helper functions to generate resource labels strings for GCP entitites
 
-from apache_beam.metrics.cells cimport MetricCell
+These can be used on MonitoringInfo 'resource' labels.
 
+See example entities:
+    https://s.apache.org/beam-gcp-debuggability
 
-cdef object get_current_tracker
+For GCP entities, populate the RESOURCE label with the aip.dev/122 format:
+https://google.aip.dev/122
 
+If an official GCP format does not exist, try to use the following format.
+    //whatever.googleapis.com/parents/{parentId}/whatevers/{whateverId}
+"""
 
-cdef class _TypedMetricName(object):
-  cdef readonly object cell_type
-  cdef readonly object metric_name
-  cdef readonly object fast_name
-  cdef libc.stdint.int64_t _hash
 
-
-cdef object _DEFAULT
-
-
-cdef class MetricUpdater(object):
-  cdef _TypedMetricName typed_metric_name
-  cdef object default
-
-
-cdef class MetricsContainer(object):
-  cdef object step_name
-  cdef public dict metrics
-  cpdef MetricCell get_metric_cell(self, metric_key)
+def BigQueryTable(project_id, dataset_id, table_id):
+  return '//bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s' % (
+      project_id, dataset_id, table_id)
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index f638698..a7b7938 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -121,7 +121,11 @@ class CounterCell(MetricCell):
   def update(self, value):
     if cython.compiled:
       ivalue = value
-      # We hold the GIL, no need for another lock.
+      # Since We hold the GIL, no need for another lock.
+      # And because the C threads won't preempt and interleave
+      # each other.
+      # Assuming there is no code trying to access the counters
+      # directly by circumventing the GIL.
       self.value += ivalue
     else:
       with self._lock:
@@ -134,11 +138,17 @@ class CounterCell(MetricCell):
 
   def to_runner_api_monitoring_info(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
-    return monitoring_infos.int64_user_counter(
-        name.namespace,
-        name.name,
-        self.get_cumulative(),
-        ptransform=transform_id)
+    if not name.urn:
+      # User counter case.
+      return monitoring_infos.int64_user_counter(
+          name.namespace,
+          name.name,
+          self.get_cumulative(),
+          ptransform=transform_id)
+    else:
+      # Arbitrary URN case.
+      return monitoring_infos.int64_counter(
+          name.urn, self.get_cumulative(), labels=name.labels)
 
 
 class DistributionCell(MetricCell):
diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/metrics/execution.pxd
index 6e1cbb0..3605d82 100644
--- a/sdks/python/apache_beam/metrics/execution.pxd
+++ b/sdks/python/apache_beam/metrics/execution.pxd
@@ -20,7 +20,6 @@ cimport libc.stdint
 
 from apache_beam.metrics.cells cimport MetricCell
 
-
 cdef object get_current_tracker
 
 
@@ -36,7 +35,8 @@ cdef object _DEFAULT
 
 cdef class MetricUpdater(object):
   cdef _TypedMetricName typed_metric_name
-  cdef object default
+  cdef object default_value
+  cdef bint process_wide  # bint is used to represent C++ bool.
 
 
 cdef class MetricsContainer(object):
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index 4afd815..453be6b 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -163,24 +163,28 @@ class _MetricsEnvironment(object):
       return None
     return sampler.current_state().metrics_container
 
+  def process_wide_container(self):
+    """Returns the MetricsContainer for process wide metrics, e.g. memory."""
+    return PROCESS_WIDE_METRICS_CONTAINER
+
 
 MetricsEnvironment = _MetricsEnvironment()
 
 
 class _TypedMetricName(object):
   """Like MetricName, but also stores the cell type of the metric."""
-  def __init__(self,
-               cell_type,  # type: Union[Type[MetricCell], MetricCellFactory]
-               metric_name  # type: Union[str, MetricName]
-              ):
+  def __init__(
+      self,
+      cell_type,  # type: Union[Type[MetricCell], MetricCellFactory]
+      metric_name  # type: Union[str, MetricName]
+  ):
     # type: (...) -> None
     self.cell_type = cell_type
     self.metric_name = metric_name
     if isinstance(metric_name, str):
       self.fast_name = metric_name
     else:
-      self.fast_name = '%d_%s%s' % (
-          len(metric_name.name), metric_name.name, metric_name.namespace)
+      self.fast_name = metric_name.fast_name()
     # Cached for speed, as this is used as a key for every counter update.
     self._hash = hash((cell_type, self.fast_name))
 
@@ -194,6 +198,9 @@ class _TypedMetricName(object):
   def __hash__(self):
     return self._hash
 
+  def __str__(self):
+    return '%s %s' % (self.cell_type, self.metric_name)
+
   def __reduce__(self):
     return _TypedMetricName, (self.cell_type, self.metric_name)
 
@@ -203,33 +210,43 @@ _DEFAULT = None  # type: Any
 
 class MetricUpdater(object):
   """A callable that updates the metric as quickly as possible."""
-  def __init__(self,
-               cell_type,  # type: Union[Type[MetricCell], MetricCellFactory]
-               metric_name,  # type: Union[str, MetricName]
-               default=None):
+  def __init__(
+      self,
+      cell_type,  # type: Union[Type[MetricCell], MetricCellFactory]
+      metric_name,  # type: Union[str, MetricName]
+      default_value=None,
+      process_wide=False):
+    self.process_wide = process_wide
     self.typed_metric_name = _TypedMetricName(cell_type, metric_name)
-    self.default = default
+    self.default_value = default_value
 
   def __call__(self, value=_DEFAULT):
     # type: (Any) -> None
     if value is _DEFAULT:
-      if self.default is _DEFAULT:
+      if self.default_value is _DEFAULT:
         raise ValueError(
             'Missing value for update of %s' % self.typed_metric_name.fast_name)
-      value = self.default
-    tracker = get_current_tracker()
-    if tracker is not None:
-      tracker.update_metric(self.typed_metric_name, value)
+      value = self.default_value
+    if self.process_wide:
+      MetricsEnvironment.process_wide_container().get_metric_cell(
+          self.typed_metric_name).update(value)
+    else:
+      tracker = get_current_tracker()
+      if tracker is not None:
+        tracker.update_metric(self.typed_metric_name, value)
 
   def __reduce__(self):
     return MetricUpdater, (
         self.typed_metric_name.cell_type,
         self.typed_metric_name.metric_name,
-        self.default)
+        self.default_value)
 
 
 class MetricsContainer(object):
-  """Holds the metrics of a single step and a single bundle."""
+  """Holds the metrics of a single step and a single bundle.
+
+  Or the metrics associated with the process/SDK harness. I.e. memory usage.
+  """
   def __init__(self, step_name):
     self.step_name = step_name
     self.metrics = dict()  # type: Dict[_TypedMetricName, MetricCell]
@@ -296,14 +313,14 @@ class MetricsContainer(object):
     # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     """Returns a list of MonitoringInfos for the metrics in this container."""
-    all_user_metrics = [
+    all_metrics = [
         cell.to_runner_api_monitoring_info(key.metric_name, transform_id)
         for key,
         cell in self.metrics.items()
     ]
     return {
         monitoring_infos.to_key(mi): mi
-        for mi in all_user_metrics if mi is not None
+        for mi in all_metrics if mi is not None
     }
 
   def reset(self):
@@ -315,6 +332,9 @@ class MetricsContainer(object):
     raise NotImplementedError
 
 
+PROCESS_WIDE_METRICS_CONTAINER = MetricsContainer(None)
+
+
 class MetricUpdates(object):
   """Contains updates for several metrics.
 
@@ -322,11 +342,12 @@ class MetricUpdates(object):
   For Distribution metrics, it is DistributionData, and for Counter metrics,
   it's an int.
   """
-  def __init__(self,
-               counters=None,  # type: Optional[Dict[MetricKey, int]]
-               distributions=None,  # type: Optional[Dict[MetricKey, DistributionData]]
-               gauges=None  # type: Optional[Dict[MetricKey, GaugeData]]
-              ):
+  def __init__(
+      self,
+      counters=None,  # type: Optional[Dict[MetricKey, int]]
+      distributions=None,  # type: Optional[Dict[MetricKey, DistributionData]]
+      gauges=None  # type: Optional[Dict[MetricKey, GaugeData]]
+  ):
     # type: (...) -> None
 
     """Create a MetricUpdates object.
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 335c568..2ca0984 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -123,10 +123,14 @@ class Metrics(object):
 
   class DelegatingCounter(Counter):
     """Metrics Counter that Delegates functionality to MetricsEnvironment."""
-    def __init__(self, metric_name):
-      # type: (MetricName) -> None
+    def __init__(self, metric_name, process_wide=False):
+      # type: (MetricName, bool) -> None
       super(Metrics.DelegatingCounter, self).__init__(metric_name)
-      self.inc = MetricUpdater(cells.CounterCell, metric_name, default=1)  # type: ignore[assignment]
+      self.inc = MetricUpdater(  # type: ignore[assignment]
+          cells.CounterCell,
+          metric_name,
+          default_value=1,
+          process_wide=process_wide)
 
   class DelegatingDistribution(Distribution):
     """Metrics Distribution Delegates functionality to MetricsEnvironment."""
@@ -233,6 +237,8 @@ class MetricsFilter(object):
   sets. No execution/matching logic is added to this object, so that it may
   be used to construct arguments as an RPC request. It is left for runners
   to implement matching logic by themselves.
+
+  Note: This class only supports user defined metrics.
   """
   def __init__(self):
     # type: () -> None
@@ -257,9 +263,9 @@ class MetricsFilter(object):
 
   def with_metric(self, metric):
     # type: (Metric) -> MetricsFilter
-    return (
-        self.with_name(metric.metric_name.name).with_namespace(
-            metric.metric_name.namespace))
+    name = metric.metric_name.name or ''
+    namespace = metric.metric_name.namespace or ''
+    return self.with_name(name).with_namespace(namespace)
 
   def with_name(self, name):
     # type: (str) -> MetricsFilter
diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py
index 2c9e6a8..f29641e 100644
--- a/sdks/python/apache_beam/metrics/metricbase.py
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -37,6 +37,8 @@ Available classes:
 from __future__ import absolute_import
 
 from builtins import object
+from typing import Dict
+from typing import Optional
 
 __all__ = [
     'Metric', 'Counter', 'Distribution', 'Gauge', 'Histogram', 'MetricName'
@@ -50,34 +52,56 @@ class MetricName(object):
   allows grouping related metrics together and also prevents collisions
   between multiple metrics of the same name.
   """
-  def __init__(self, namespace, name):
-    # type: (str, str) -> None
+  def __init__(self, namespace, name, urn=None, labels=None):
+    # type: (Optional[str], Optional[str], Optional[str], Optional[Dict[str, str]]) -> None
 
     """Initializes ``MetricName``.
 
+    Note: namespace and name should be set for user metrics,
+    urn and labels should be set for an arbitrary metric to package into a
+    MonitoringInfo.
+
     Args:
       namespace: A string with the namespace of a metric.
       name: A string with the name of a metric.
+      urn: URN to populate on a MonitoringInfo, when sending to RunnerHarness.
+      labels: Labels to populate on a MonitoringInfo
     """
-    if not namespace:
-      raise ValueError('Metric namespace must be non-empty')
-    if not name:
-      raise ValueError('Metric name must be non-empty')
+    if not urn:
+      if not namespace:
+        raise ValueError('Metric namespace must be non-empty')
+      if not name:
+        raise ValueError('Metric name must be non-empty')
     self.namespace = namespace
     self.name = name
+    self.urn = urn
+    self.labels = labels if labels else {}
 
   def __eq__(self, other):
-    return self.namespace == other.namespace and self.name == other.name
+    return (
+        self.namespace == other.namespace and self.name == other.name and
+        self.urn == other.urn and self.labels == other.labels)
 
   def __ne__(self, other):
     # TODO(BEAM-5949): Needed for Python 2 compatibility.
     return not self == other
 
   def __str__(self):
-    return 'MetricName(namespace={}, name={})'.format(self.namespace, self.name)
+    return 'MetricName(namespace={}, name={}, urn={}, labels={})'.format(
+        self.namespace, self.name, self.urn, self.labels)
 
   def __hash__(self):
-    return hash((self.namespace, self.name))
+    return hash((self.namespace, self.name, self.urn) +
+                tuple(self.labels.items()))
+
+  def fast_name(self):
+    name = self.name or ''
+    namespace = self.namespace or ''
+    urn = self.urn or ''
+    labels = ''
+    if self.labels:
+      labels = '_'.join(['%s=%s' % (k, v) for (k, v) in self.labels.items()])
+    return '%d_%s%s%s%s' % (len(name), name, namespace, urn, labels)
 
 
 class Metric(object):
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 7afd190..33bb1ca 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -59,6 +59,8 @@ WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
 WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
 DATA_CHANNEL_READ_INDEX = (
     common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn)
+API_REQUEST_COUNT_URN = (
+    common_urns.monitoring_info_specs.API_REQUEST_COUNT.spec.urn)
 
 # TODO(ajamato): Implement the remaining types, i.e. Double types
 # Extrema types, etc. See:
@@ -81,6 +83,20 @@ PTRANSFORM_LABEL = (
 NAMESPACE_LABEL = (
     common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
 NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
+SERVICE_LABEL = (common_urns.monitoring_info_labels.SERVICE.label_props.name)
+METHOD_LABEL = (common_urns.monitoring_info_labels.METHOD.label_props.name)
+RESOURCE_LABEL = (common_urns.monitoring_info_labels.RESOURCE.label_props.name)
+STATUS_LABEL = (common_urns.monitoring_info_labels.STATUS.label_props.name)
+BIGQUERY_PROJECT_ID_LABEL = (
+    common_urns.monitoring_info_labels.BIGQUERY_PROJECT_ID.label_props.name)
+BIGQUERY_DATASET_LABEL = (
+    common_urns.monitoring_info_labels.BIGQUERY_DATASET.label_props.name)
+BIGQUERY_TABLE_LABEL = (
+    common_urns.monitoring_info_labels.BIGQUERY_TABLE.label_props.name)
+BIGQUERY_VIEW_LABEL = (
+    common_urns.monitoring_info_labels.BIGQUERY_VIEW.label_props.name)
+BIGQUERY_QUERY_NAME_LABEL = (
+    common_urns.monitoring_info_labels.BIGQUERY_QUERY_NAME.label_props.name)
 
 
 def extract_counter_value(monitoring_info_proto):
@@ -151,7 +167,7 @@ def int64_user_counter(namespace, name, metric, ptransform=None):
       USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)
 
 
-def int64_counter(urn, metric, ptransform=None, pcollection=None):
+def int64_counter(urn, metric, ptransform=None, pcollection=None, labels=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return the counter monitoring info for the specifed URN, metric and labels.
@@ -162,7 +178,8 @@ def int64_counter(urn, metric, ptransform=None, pcollection=None):
     ptransform: The ptransform id used as a label.
     pcollection: The pcollection id used as a label.
   """
-  labels = create_labels(ptransform=ptransform, pcollection=pcollection)
+  labels = labels or dict()
+  labels.update(create_labels(ptransform=ptransform, pcollection=pcollection))
   if isinstance(metric, int):
     metric = coders.VarIntCoder().encode(metric)
   return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos_test.py b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
index eb1d48a..2855e23 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos_test.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
 import unittest
 
 from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.cells import CounterCell
 from apache_beam.metrics.cells import GaugeCell
 
 
@@ -72,6 +73,40 @@ class MonitoringInfosTest(unittest.TestCase):
     _, gauge_value = monitoring_infos.extract_gauge_value(result)
     self.assertEqual(0, gauge_value)
 
+  def test_int64_user_counter(self):
+    expected_labels = {}
+    expected_labels[monitoring_infos.NAMESPACE_LABEL] = "counternamespace"
+    expected_labels[monitoring_infos.NAME_LABEL] = "countername"
+
+    metric = CounterCell().get_cumulative()
+    result = monitoring_infos.int64_user_counter(
+        'counternamespace', 'countername', metric)
+    counter_value = monitoring_infos.extract_counter_value(result)
+
+    self.assertEqual(0, counter_value)
+    self.assertEqual(result.labels, expected_labels)
+
+  def test_int64_counter(self):
+    expected_labels = {}
+    expected_labels[monitoring_infos.PCOLLECTION_LABEL] = "collectionname"
+    expected_labels[monitoring_infos.PTRANSFORM_LABEL] = "ptransformname"
+    expected_labels[monitoring_infos.SERVICE_LABEL] = "BigQuery"
+
+    labels = {
+        monitoring_infos.SERVICE_LABEL: "BigQuery",
+    }
+    metric = CounterCell().get_cumulative()
+    result = monitoring_infos.int64_counter(
+        monitoring_infos.API_REQUEST_COUNT_URN,
+        metric,
+        ptransform="ptransformname",
+        pcollection="collectionname",
+        labels=labels)
+    counter_value = monitoring_infos.extract_counter_value(result)
+
+    self.assertEqual(0, counter_value)
+    self.assertEqual(result.labels, expected_labels)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index b3c095c..1890992 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -59,6 +59,7 @@ from future.utils import with_metaclass
 
 from apache_beam.coders import coder_impl
 from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import metrics_pb2
@@ -328,6 +329,31 @@ class SdkHarness(object):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
     self._request_execute(request)
 
+  def _request_harness_monitoring_infos(self, request):
+    # type: (beam_fn_api_pb2.InstructionRequest) -> None
+    process_wide_monitoring_infos = MetricsEnvironment.process_wide_container(
+    ).to_runner_api_monitoring_infos(None).values()
+    self._execute(
+        lambda: beam_fn_api_pb2.InstructionResponse(
+            instruction_id=request.instruction_id,
+            harness_monitoring_infos=(
+                beam_fn_api_pb2.HarnessMonitoringInfosResponse(
+                    monitoring_data={
+                        SHORT_ID_CACHE.getShortId(info): info.payload
+                        for info in process_wide_monitoring_infos
+                    }))),
+        request)
+
+  def _request_monitoring_infos(self, request):
+    # type: (beam_fn_api_pb2.InstructionRequest) -> None
+    self._execute(
+        lambda: beam_fn_api_pb2.InstructionResponse(
+            instruction_id=request.instruction_id,
+            monitoring_infos=beam_fn_api_pb2.MonitoringInfosMetadataResponse(
+                monitoring_info=SHORT_ID_CACHE.getInfos(
+                    request.monitoring_infos.monitoring_info_id))),
+        request)
+
   def _request_execute(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
     def task():
@@ -733,18 +759,6 @@ class SdkWorker(object):
                 for info in monitoring_infos
             }))
 
-  def monitoring_infos_request(
-      self,
-      request,  # type: beam_fn_api_pb2.MonitoringInfosMetadataRequest
-      instruction_id  # type: str
-  ):
-    # type: (...) -> beam_fn_api_pb2.InstructionResponse
-    return beam_fn_api_pb2.InstructionResponse(
-        instruction_id=instruction_id,
-        monitoring_infos=beam_fn_api_pb2.MonitoringInfosMetadataResponse(
-            monitoring_info=SHORT_ID_CACHE.getInfos(
-                request.monitoring_info_id)))
-
   def finalize_bundle(
       self,
       request,  # type: beam_fn_api_pb2.FinalizeBundleRequest
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c65665c..c1cf641 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -36,6 +36,9 @@ import hamcrest as hc
 import mock
 
 from apache_beam.coders import VarIntCoder
+from apache_beam.internal.metrics.metric import Metrics as InternalMetrics
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -207,6 +210,63 @@ class SdkWorkerTest(unittest.TestCase):
             instruction_id='split_instruction_id',
             process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitResponse()))
 
+  def get_responses(self, instruction_requests):
+    """Evaluates and returns {id: InstructionResponse} for the requests."""
+    test_controller = BeamFnControlServicer(instruction_requests)
+
+    server = grpc.server(thread_pool_executor.shared_unbounded_instance())
+    beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
+        test_controller, server)
+    test_port = server.add_insecure_port("[::]:0")
+    server.start()
+
+    harness = sdk_worker.SdkHarness(
+        "localhost:%s" % test_port, state_cache_size=100)
+    harness.run()
+    return test_controller.responses
+
+  def test_harness_monitoring_infos_and_metadata(self):
+    # Clear the process wide metric container.
+    MetricsEnvironment.process_wide_container().reset()
+    # Create a process_wide metric.
+    urn = 'my.custom.urn'
+    labels = {'key': 'value'}
+    InternalMetrics.counter(urn=urn, labels=labels, process_wide=True).inc(10)
+
+    harness_monitoring_infos_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id="monitoring_infos",
+        harness_monitoring_infos=beam_fn_api_pb2.HarnessMonitoringInfosRequest(
+        ))
+
+    responses = self.get_responses([harness_monitoring_infos_request])
+
+    expected_monitoring_info = monitoring_infos.int64_counter(
+        urn, 10, labels=labels)
+    monitoring_data = (
+        responses['monitoring_infos'].harness_monitoring_infos.monitoring_data)
+
+    # Request the full MonitoringInfo metadata for the returned short_ids.
+    short_ids = list(monitoring_data.keys())
+    monitoring_infos_metadata_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id="monitoring_infos_metadata",
+        monitoring_infos=beam_fn_api_pb2.MonitoringInfosMetadataRequest(
+            monitoring_info_id=short_ids))
+
+    responses = self.get_responses([monitoring_infos_metadata_request])
+
+    # Request the full MonitoringInfo metadata to be returned now.
+    expected_monitoring_info.ClearField("payload")
+
+    # Verify that one of the returned monitoring infos is our expected
+    # monitoring info.
+    short_id_to_mi = (
+        responses['monitoring_infos_metadata'].monitoring_infos.monitoring_info)
+    found = False
+    for mi in short_id_to_mi.values():
+      if mi == expected_monitoring_info:
+        found = True
+    self.assertTrue(found, str(responses['monitoring_infos_metadata']))
+
   def test_failed_bundle_processor_returns_failed_split_response(self):
     bundle_processor = mock.MagicMock()
     bundle_processor_cache = BundleProcessorCache(None, None, {})