You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/24 20:52:52 UTC

[GitHub] [beam] HuangLED commented on a change in pull request #11514: [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

HuangLED commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414856094



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -337,43 +337,48 @@ def add_receiver(self, operation, output_index=0):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
-  def monitoring_infos(self, transform_id):
-    # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+  def monitoring_infos(self, transform_id, pcollection_ids):
+    # type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
     """Returns the list of MonitoringInfos collected by this operation."""
     all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
     all_monitoring_infos.update(
-        self.pcollection_count_monitoring_infos(transform_id))
+        self.pcollection_count_monitoring_infos(pcollection_ids))
     all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
     return all_monitoring_infos
 
-  def pcollection_count_monitoring_infos(self, transform_id):
+  def pcollection_count_monitoring_infos(self, pcollection_ids):
     """Returns the element count MonitoringInfo collected by this operation."""
-    if len(self.receivers) == 1:
-      # If there is exactly one output, we can unambiguously
-      # fix its name later, which we do.
-      # TODO(robertwb): Plumb the actual name here.
+    if len(self.receivers) != len(pcollection_ids):
+      raise RuntimeError(
+          'Unexpected number of receivers for number of pcollections %s %s' %
+          (self.receivers, pcollection_ids))
+
+    all_monitoring_infos = {}
+    for i in range(len(self.receivers)):

Review comment:
       +1 using zip




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org