You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/23 23:27:04 UTC

[GitHub] [beam] lukecwik opened a new pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

lukecwik opened a new pull request #11514:
URL: https://github.com/apache/beam/pull/11514


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619228095


   I looked through the implementation and it seems as though adding the pcollection id to the ConsumerSet doesn't work out since operations don't have that level of visibility in pipeline proto and consumers works off of a index -> receiver map and expects tags to get mapped to indices so we would need to go through all three layers. I suggest that we stick with this brittle approach until we can delete the non-portable Python worker implementation which would make a lot of the layers simpler.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-618722327


   R: @HuangLED @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-620325962


   I am suspecting that this made precommits flaky


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-618734970


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-620332113


   The breakage happened before this was merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414208960



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:

Review comment:
       Likewise this. 

##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -337,43 +337,48 @@ def add_receiver(self, operation, output_index=0):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+  def monitoring_infos(self, transform_id, pcollection_ids):
+    # type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     """Returns the list of MonitoringInfos collected by this operation."""
     all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
     all_monitoring_infos.update(
-        self.pcollection_count_monitoring_infos(transform_id))
+        self.pcollection_count_monitoring_infos(pcollection_ids))
     all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
     return all_monitoring_infos
 
-  def pcollection_count_monitoring_infos(self, transform_id):
+  def pcollection_count_monitoring_infos(self, pcollection_ids):
     """Returns the element count MonitoringInfo collected by this operation."""
-    if len(self.receivers) == 1:
-      # If there is exactly one output, we can unambiguously
-      # fix its name later, which we do.
-      # TODO(robertwb): Plumb the actual name here.
+    if len(self.receivers) != len(pcollection_ids):
+      raise RuntimeError(
+          'Unexpected number of receivers for number of pcollections %s %s' %
+          (self.receivers, pcollection_ids))
+
+    all_monitoring_infos = {}
+    for i in range(len(self.receivers)):

Review comment:
       This will change if you use a mapping, but `zip` would be the idiom to use here. 

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:
-          return
-
-        pcollection_name = (
-            self.process_bundle_descriptor.transforms[ptransform_label].
-            outputs[tag_label])
-
-        monitoring_info.labels[
-            monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
-        # Cleaning up labels that are not in specification.
-        monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
-        monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
-    for mi in infos_list:
-      inject_pcollection(mi)
-
-    return infos_list
+      pcollection_ids = self.process_bundle_descriptor.transforms[

Review comment:
       In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here? 

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return

Review comment:
       This would be a bug, right?

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:

Review comment:
       Actually, this can happen, and might be what's happening here. There is no PCollection for this tag, but the user outputted a value to this tag. It would make sense to record this output even if we didn't use it. This is another downside of attaching these counters to PCollections themselves rather than to PTransform outputs. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414265186



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return

Review comment:
       yeah




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414193484



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:
-          return
-
-        pcollection_name = (
-            self.process_bundle_descriptor.transforms[ptransform_label].
-            outputs[tag_label])
-
-        monitoring_info.labels[
-            monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
-        # Cleaning up labels that are not in specification.
-        monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
-        monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
-    for mi in infos_list:
-      inject_pcollection(mi)
-
-    return infos_list
+      pcollection_ids = self.process_bundle_descriptor.transforms[

Review comment:
       I believe the id order aligns with the receiver order since transform_consumers built above iterates the outputs map in the same order and this gets plumbed down through to Operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-618782738


   > I suppose this would be to finish the transition from reporting counters on PTransform outputs to recording them on the various PCollections.
   > 
   > LGTM if you can change to use a mapping of tags to pcoll ids rather than relying on ordering being the same.
   
   I'll try it out but I worry that the consumers/receivers are either using indices or the post string converted tag names since python does some post processing converting the string tags to non string tags.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619428364


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-620332113


   The breakage happened before this was merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619286690


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619306092


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-620326073


   see: https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414265121



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
     # Construct a new dict first to remove duplicates.
     all_monitoring_infos_dict = {}
     for transform_id, op in self.ops.items():
-      for mi in op.monitoring_infos(transform_id).values():
-        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
-        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
-    infos_list = list(all_monitoring_infos_dict.values())
-
-    def inject_pcollection(monitoring_info):
-      """
-      If provided metric is element count metric:
-      Finds relevant transform output info in current process_bundle_descriptor
-      and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
-      info.
-      """
-      if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
-        if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
-          return
-        ptransform_label = monitoring_info.labels[
-            monitoring_infos.PTRANSFORM_LABEL]
-        if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
-          return
-        tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
-        if not ptransform_label in self.process_bundle_descriptor.transforms:
-          return
-        if not tag_label in self.process_bundle_descriptor.transforms[
-            ptransform_label].outputs:

Review comment:
       unknown outputs should probably be reported another way




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619448742


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619298111






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619298111


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11514:
URL: https://github.com/apache/beam/pull/11514#issuecomment-619444646


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] HuangLED commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

Posted by GitBox <gi...@apache.org>.
HuangLED commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414856094



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -337,43 +337,48 @@ def add_receiver(self, operation, output_index=0):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+  def monitoring_infos(self, transform_id, pcollection_ids):
+    # type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     """Returns the list of MonitoringInfos collected by this operation."""
     all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
     all_monitoring_infos.update(
-        self.pcollection_count_monitoring_infos(transform_id))
+        self.pcollection_count_monitoring_infos(pcollection_ids))
     all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
     return all_monitoring_infos
 
-  def pcollection_count_monitoring_infos(self, transform_id):
+  def pcollection_count_monitoring_infos(self, pcollection_ids):
     """Returns the element count MonitoringInfo collected by this operation."""
-    if len(self.receivers) == 1:
-      # If there is exactly one output, we can unambiguously
-      # fix its name later, which we do.
-      # TODO(robertwb): Plumb the actual name here.
+    if len(self.receivers) != len(pcollection_ids):
+      raise RuntimeError(
+          'Unexpected number of receivers for number of pcollections %s %s' %
+          (self.receivers, pcollection_ids))
+
+    all_monitoring_infos = {}
+    for i in range(len(self.receivers)):

Review comment:
       +1 using zip




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org