You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/12 22:10:39 UTC

[beam] branch release-2.21.0 updated: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 9aceef9  [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514)
     new 94fc805  Merge pull request #11687 from ibzib/BEAM-9488
9aceef9 is described below

commit 9aceef984c2d2ced55148c9624036955cffb5698
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Sat Apr 25 21:52:20 2020 -0700

    [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514)
    
    * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.
    
    * fixup! Convert to list since values() isn't subscriptable
    
    * fixup! Use zip
    
    * fixup! Migrate to use tag -> pcollection id
    
    * fixup! lint
    
    * fixup! Fix comparison
---
 .../python/apache_beam/metrics/monitoring_infos.py |  60 +++++------
 .../apache_beam/runners/worker/bundle_processor.py |  57 +----------
 .../apache_beam/runners/worker/operations.pxd      |   4 +-
 .../apache_beam/runners/worker/operations.py       | 110 ++++++++++++---------
 4 files changed, 94 insertions(+), 137 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 2d1524c..92eb9de 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -78,7 +78,6 @@ PTRANSFORM_LABEL = (
 NAMESPACE_LABEL = (
     common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
 NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
-TAG_LABEL = "TAG"
 
 
 def extract_counter_value(monitoring_info_proto):
@@ -113,26 +112,26 @@ def extract_distribution(monitoring_info_proto):
       coders.VarIntCoder(), monitoring_info_proto.payload)
 
 
-def create_labels(ptransform=None, tag=None, namespace=None, name=None):
-  """Create the label dictionary based on the provided tags.
+def create_labels(ptransform=None, namespace=None, name=None, pcollection=None):
+  """Create the label dictionary based on the provided values.
 
   Args:
-    ptransform: The ptransform/step name.
-    tag: he output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
+    pcollection: The pcollection id used as a label.
   """
   labels = {}
-  if tag:
-    labels[TAG_LABEL] = tag
   if ptransform:
     labels[PTRANSFORM_LABEL] = ptransform
   if namespace:
     labels[NAMESPACE_LABEL] = namespace
   if name:
     labels[NAME_LABEL] = name
+  if pcollection:
+    labels[PCOLLECTION_LABEL] = pcollection
   return labels
 
 
-def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_counter(namespace, name, metric, ptransform=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return the counter monitoring info for the specifed URN, metric and labels.
@@ -140,18 +139,16 @@ def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
   Args:
     urn: The URN of the monitoring info/metric.
     metric: The payload field to use in the monitoring info or an int value.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
   """
-  labels = create_labels(
-      ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+  labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
   if isinstance(metric, int):
     metric = coders.VarIntCoder().encode(metric)
   return create_monitoring_info(
       USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)
 
 
-def int64_counter(urn, metric, ptransform=None, tag=None):
+def int64_counter(urn, metric, ptransform=None, pcollection=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return the counter monitoring info for the specifed URN, metric and labels.
@@ -159,33 +156,31 @@ def int64_counter(urn, metric, ptransform=None, tag=None):
   Args:
     urn: The URN of the monitoring info/metric.
     metric: The payload field to use in the monitoring info or an int value.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
+    pcollection: The pcollection id used as a label.
   """
-  labels = create_labels(ptransform=ptransform, tag=tag)
+  labels = create_labels(ptransform=ptransform, pcollection=pcollection)
   if isinstance(metric, int):
     metric = coders.VarIntCoder().encode(metric)
   return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
 
 
-def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_distribution(namespace, name, metric, ptransform=None):
   """Return the distribution monitoring info for the URN, metric and labels.
 
   Args:
     urn: The URN of the monitoring info/metric.
     metric: The DistributionData for the metric.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
   """
-  labels = create_labels(
-      ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+  labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
   payload = _encode_distribution(
       coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
   return create_monitoring_info(
       USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
 
 
-def int64_distribution(urn, metric, ptransform=None, tag=None):
+def int64_distribution(urn, metric, ptransform=None, pcollection=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return a distribution monitoring info for the URN, metric and labels.
@@ -193,16 +188,16 @@ def int64_distribution(urn, metric, ptransform=None, tag=None):
   Args:
     urn: The URN of the monitoring info/metric.
     metric: The DistributionData for the metric.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
+    pcollection: The pcollection id used as a label.
   """
-  labels = create_labels(ptransform=ptransform, tag=tag)
+  labels = create_labels(ptransform=ptransform, pcollection=pcollection)
   payload = _encode_distribution(
       coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
   return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
 
 
-def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_gauge(namespace, name, metric, ptransform=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return the gauge monitoring info for the URN, metric and labels.
@@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
     namespace: User-defined namespace of counter.
     name: Name of counter.
     metric: The GaugeData containing the metrics.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
   """
-  labels = create_labels(
-      ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+  labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
   if isinstance(metric, GaugeData):
     coder = coders.VarIntCoder()
     value = metric.value
@@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
       USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels)
 
 
-def int64_gauge(urn, metric, ptransform=None, tag=None):
+def int64_gauge(urn, metric, ptransform=None):
   # type: (...) -> metrics_pb2.MonitoringInfo
 
   """Return the gauge monitoring info for the URN, metric and labels.
@@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None):
     urn: The URN of the monitoring info/metric.
     metric: An int representing the value. The current time will be used for
             the timestamp.
-    ptransform: The ptransform/step name used as a label.
-    tag: The output tag name, used as a label.
+    ptransform: The ptransform id used as a label.
   """
-  labels = create_labels(ptransform=ptransform, tag=tag)
+  labels = create_labels(ptransform=ptransform)
   if isinstance(metric, int):
     value = metric
     time_ms = int(time.time()) * 1000
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2cd69c3..82359c2 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -1076,59 +1076,12 @@ class BundleProcessor(object):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:
-          return
-
-        pcollection_name = (
-            self.process_bundle_descriptor.transforms[ptransform_label].
-            outputs[tag_label])
-
-        monitoring_info.labels[
-            monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
-        # Cleaning up labels that are not in specification.
-        monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
-        monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
-    for mi in infos_list:
-      inject_pcollection(mi)
-
-    return infos_list
+      tag_to_pcollection_id = self.process_bundle_descriptor.transforms[
+          transform_id].outputs
+      all_monitoring_infos_dict.update(
+          op.monitoring_infos(transform_id, tag_to_pcollection_id))
 
-  def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
-    # type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo
-    actual_output_tags = list(
-        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
-    if ('TAG' in monitoring_info.labels and
-        monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
-      if len(actual_output_tags) == 1:
-        monitoring_info.labels['TAG'] = actual_output_tags[0]
-    return monitoring_info
+    return list(all_monitoring_infos_dict.values())
 
   def shutdown(self):
     # type: () -> None
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 418cdcc..36fa809 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -73,8 +73,8 @@ cdef class Operation(object):
   cpdef output(self, WindowedValue windowed_value, int output_index=*)
   cpdef execution_time_monitoring_infos(self, transform_id)
   cpdef user_monitoring_infos(self, transform_id)
-  cpdef pcollection_count_monitoring_infos(self, transform_id)
-  cpdef monitoring_infos(self, transform_id)
+  cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id)
+  cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id)
 
 
 cdef class ReadOperation(Operation):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 66f12f2..2a27136 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -337,43 +337,49 @@ class Operation(object):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+  def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+    # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     """Returns the list of MonitoringInfos collected by this operation."""
     all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
     all_monitoring_infos.update(
-        self.pcollection_count_monitoring_infos(transform_id))
+        self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
     all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
     return all_monitoring_infos
 
-  def pcollection_count_monitoring_infos(self, transform_id):
+  def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
+    # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+
     """Returns the element count MonitoringInfo collected by this operation."""
-    if len(self.receivers) == 1:
-      # If there is exactly one output, we can unambiguously
-      # fix its name later, which we do.
-      # TODO(robertwb): Plumb the actual name here.
-      elem_count_mi = monitoring_infos.int64_counter(
-          monitoring_infos.ELEMENT_COUNT_URN,
-          self.receivers[0].opcounter.element_counter.value(),
-          ptransform=transform_id,
-          tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
-      )
-
-      (unused_mean, sum, count, min, max) = (
-          self.receivers[0].opcounter.mean_byte_counter.value())
-
-      sampled_byte_count = monitoring_infos.int64_distribution(
-          monitoring_infos.SAMPLED_BYTE_SIZE_URN,
-          DistributionData(sum, count, min, max),
-          ptransform=transform_id,
-          tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
-      )
-      return {
-          monitoring_infos.to_key(elem_count_mi): elem_count_mi,
-          monitoring_infos.to_key(sampled_byte_count): sampled_byte_count
-      }
-    return {}
+
+    # Skip producing monitoring infos if there is more then one receiver
+    # since there is no way to provide a mapping from tag to pcollection id
+    # within Operation.
+    if len(self.receivers) != 1 or len(tag_to_pcollection_id) != 1:
+      return {}
+
+    all_monitoring_infos = {}
+    pcollection_id = next(iter(tag_to_pcollection_id.values()))
+    receiver = self.receivers[0]
+    elem_count_mi = monitoring_infos.int64_counter(
+        monitoring_infos.ELEMENT_COUNT_URN,
+        receiver.opcounter.element_counter.value(),
+        pcollection=pcollection_id,
+    )
+
+    (unused_mean, sum, count, min, max) = (
+        receiver.opcounter.mean_byte_counter.value())
+
+    sampled_byte_count = monitoring_infos.int64_distribution(
+        monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+        DistributionData(sum, count, min, max),
+        pcollection=pcollection_id,
+    )
+    all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
+    all_monitoring_infos[monitoring_infos.to_key(
+        sampled_byte_count)] = sampled_byte_count
+
+    return all_monitoring_infos
 
   def user_monitoring_infos(self, transform_id):
     """Returns the user MonitoringInfos collected by this operation."""
@@ -709,25 +715,31 @@ class DoOperation(Operation):
       self.user_state_context.reset()
     self.dofn_runner.bundle_finalizer_param.reset()
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
-    infos = super(DoOperation, self).monitoring_infos(transform_id)
+  def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
+    # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+
+    """Returns the element count MonitoringInfo collected by this operation."""
+    infos = super(
+        DoOperation,
+        self).pcollection_count_monitoring_infos(tag_to_pcollection_id)
+
     if self.tagged_receivers:
       for tag, receiver in self.tagged_receivers.items():
-        mi = monitoring_infos.int64_counter(
-            monitoring_infos.ELEMENT_COUNT_URN,
-            receiver.opcounter.element_counter.value(),
-            ptransform=transform_id,
-            tag=str(tag))
-        infos[monitoring_infos.to_key(mi)] = mi
-        (unused_mean, sum, count, min, max) = (
-            receiver.opcounter.mean_byte_counter.value())
-        sampled_byte_count = monitoring_infos.int64_distribution(
-            monitoring_infos.SAMPLED_BYTE_SIZE_URN,
-            DistributionData(sum, count, min, max),
-            ptransform=transform_id,
-            tag=str(tag))
-        infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
+        pcollection_id = tag_to_pcollection_id[str(tag)]
+        if pcollection_id:
+          mi = monitoring_infos.int64_counter(
+              monitoring_infos.ELEMENT_COUNT_URN,
+              receiver.opcounter.element_counter.value(),
+              pcollection=pcollection_id)
+          infos[monitoring_infos.to_key(mi)] = mi
+          (unused_mean, sum, count, min, max) = (
+              receiver.opcounter.mean_byte_counter.value())
+          sampled_byte_count = monitoring_infos.int64_distribution(
+              monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+              DistributionData(sum, count, min, max),
+              pcollection=pcollection_id)
+          infos[monitoring_infos.to_key(
+              sampled_byte_count)] = sampled_byte_count
     return infos
 
 
@@ -778,8 +790,8 @@ class SdfProcessSizedElements(DoOperation):
               self.element_start_output_bytes)
       return None
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+  def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+    # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     def encode_progress(value):
       # type: (float) -> bytes
@@ -788,7 +800,7 @@ class SdfProcessSizedElements(DoOperation):
 
     with self.lock:
       infos = super(SdfProcessSizedElements,
-                    self).monitoring_infos(transform_id)
+                    self).monitoring_infos(transform_id, tag_to_pcollection_id)
       current_element_progress = self.current_element_progress()
       if current_element_progress:
         if current_element_progress.completed_work: