You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/23 23:36:59 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

lukecwik commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414193484



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:
-          return
-
-        pcollection_name = (
-            self.process_bundle_descriptor.transforms[ptransform_label].
-            outputs[tag_label])
-
-        monitoring_info.labels[
-            monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
-        # Cleaning up labels that are not in specification.
-        monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
-        monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
-    for mi in infos_list:
-      inject_pcollection(mi)
-
-    return infos_list
+      pcollection_ids = self.process_bundle_descriptor.transforms[

Review comment:
       I believe the id order aligns with the receiver order since transform_consumers built above iterates the outputs map in the same order and this gets plumbed down through to Operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org