You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/29 17:42:06 UTC

[GitHub] [beam] pabloem commented on a change in pull request #16841: Revert "Revert "Merge pull request #15441 from [BEAM-8823] Make FnApi…

pabloem commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r837741386



##########
File path: sdks/python/apache_beam/examples/matrix_power.py
##########
@@ -0,0 +1,103 @@
+#

Review comment:
       this particular test is a test we had internal at Google that surfaced a particular failure mode for the playground, so that's why I'm including it.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -187,6 +222,9 @@ def __init__(self,
     self._windowing = windowing
     self._grouped_output = None  # type: Optional[List[List[bytes]]]
 
+  def copy(self) -> 'GroupingBuffer':
+    return self

Review comment:
       hm hehe you're right. I added this to avoid copying more than necessary. Would it be okay if we leave this? The plan is to remove GroupingBuffer for streaming, and do grouping as a user function, so this structure will be removed.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue

Review comment:
       is it okay if I leave this for a follow-up change?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -96,6 +108,27 @@
 # (MultiMap / Iterable).
 DataSideInput = Dict[SideInputId, Tuple[bytes, SideInputAccessPattern]]
 
+DataOutput = Dict[str, BufferId]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer ID, Time Domain for timer]
+# The time domain comes from beam_runner_api_pb2.TimeDomain. It may be
+# EVENT_TIME or PROCESSING_TIME.
+OutputTimers = MutableMapping[TimerFamilyId, Tuple[BufferId, Any]]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer CONTENTS, Timestamp]
+OutputTimerData = MutableMapping[TimerFamilyId,
+                                 Tuple['PartitionableBuffer',
+                                       timestamp.Timestamp]]
+
+BundleProcessResult = Tuple[beam_fn_api_pb2.InstructionResponse,
+                            List[beam_fn_api_pb2.ProcessBundleSplitResponse]]
+
+
+# TODO(pabloem): Change tha name to a more representative one
+class DataInput(NamedTuple):

Review comment:
       is it okay if I leave this for a follow-up change? Maybe something like StageInputBuffers - and by having data and timers elements I would think it's self-documenting. Thoughts?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is blocked
+       on time advancing. ((stage_name, time), inputs), where
+       the time is the real time point at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+  """
+  class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+    def __init__(self) -> None:
+      self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+                                  DataInput]] = collections.deque()
+      self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+                                           Tuple[QUEUE_KEY_TYPE,
+                                                 DataInput]] = {}
+
+    def enque(self, elm: Tuple[QUEUE_KEY_TYPE, DataInput]) -> None:
+      key = elm[0]
+      incoming_inputs: DataInput = elm[1]
+      if key in self._keyed_elements:
+        existing_inputs = self._keyed_elements[key][1]
+        for pcoll in incoming_inputs.data:

Review comment:
       yes, that is possible. This is handled (there's some tests that check timers)

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER

Review comment:
       I'd like to keep this as an assertion to verify proper formatting of the payloads?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}

Review comment:
       removed

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN

Review comment:
       removed

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN
+          coder_id = self.data_channel_coders[only_element(
+              transform.inputs.values())]
+        # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the
+        # payload with the GRPC configuration for the Data channel.
+        bundle_manager = self.bundle_manager_for(stage)
+        data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+        data_api_service_descriptor = (
+            bundle_manager.data_api_service_descriptor())
+        if data_api_service_descriptor:
+          data_spec.api_service_descriptor.url = (
+              data_api_service_descriptor.url)
+        transform.spec.payload = data_spec.SerializeToString()
+      elif transform.spec.urn in translations.PAR_DO_URNS:
+        payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+        if payload.side_inputs:

Review comment:
       this is for the initial bundles only - we want to schedule only initial bundles for stages that don't have side inputs. I think this is reasonable?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -180,16 +195,14 @@ def test_pardo_side_input_dependencies(self):
                 ExpectingSideInputsFn(f'Do{k}'),
                 *[beam.pvalue.AsList(inputs[s]) for s in range(1, k)]))
 
-  @unittest.skip('BEAM-13040')
-  @retry(stop=stop_after_attempt(3))

Review comment:
       re-added

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1973,6 +2006,39 @@ def populate_data_channel_coders(stages, pipeline_context):
   return stages
 
 
+def add_impulse_to_dangling_transforms(stages, pipeline_context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+
+  """Populate coders for GRPC input and output ports."""
+  for stage in stages:
+    for transform in stage.transforms:
+      if len(transform.inputs
+             ) == 0 and transform.spec.urn != bundle_processor.DATA_INPUT_URN:
+        print('transform! : ' + transform.spec.urn)

Review comment:
       ah good catch. removed.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -363,68 +364,164 @@ def run_stages(self,
             self.NUM_FUSED_STAGES_COUNTER,
             urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update(
                 len(stages))
-    monitoring_infos_by_stage = {}
+    monitoring_infos_by_stage: MutableMapping[
+        str, Iterable['metrics_pb2.MonitoringInfo']] = {}
 
     runner_execution_context = execution.FnApiRunnerExecutionContext(
         stages,
         worker_handler_manager,
         stage_context.components,
         stage_context.safe_coders,
-        stage_context.data_channel_coders)
+        stage_context.data_channel_coders,
+        self._num_workers)
 
     try:
       with self.maybe_profile():
-        for stage in stages:
-          bundle_context_manager = execution.BundleContextManager(
-              runner_execution_context, stage, self._num_workers)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong watermark for %s. Expected %s, but got %s.' % (
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name),
-                  timestamp.MAX_TIMESTAMP,
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name
-                  ).input_watermark()
-              )
-          )
-
-          stage_results = self._run_stage(
-              runner_execution_context, bundle_context_manager)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong input watermark for %s. Expected %s, but got %s.' % (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name),
-              timestamp.MAX_TIMESTAMP,
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).output_watermark())
-          )
-
-          monitoring_infos_by_stage[stage.name] = (
-              list(stage_results.process_bundle.monitoring_infos))
-
-      monitoring_infos_by_stage[''] = list(
-          pipeline_metrics.to_runner_api_monitoring_infos('').values())
+        # Initialize Runner context:
+        # - Pipeline dictionaries, initial inputs and pipeline triggers
+        # - Replace Data API endpoints in protobufs.
+        runner_execution_context.setup()
+
+        bundle_counter = 0
+        # Start executing all ready bundles.
+        while len(runner_execution_context.queues.ready_inputs) > 0:
+          _LOGGER.debug(
+              "Remaining ready bundles: %s\n"
+              "\tWatermark pending bunbles: %s\n"
+              "\tTime pending bunbles: %s",
+              len(runner_execution_context.queues.ready_inputs),
+              len(runner_execution_context.queues.watermark_pending_inputs),
+              len(runner_execution_context.queues.time_pending_inputs))
+          consuming_stage_name, bundle_input = (
+              runner_execution_context.queues.ready_inputs.deque())
+          stage = runner_execution_context.stages[consuming_stage_name]
+          bundle_context_manager = runner_execution_context.bundle_manager_for(
+              stage)
+          _BUNDLE_LOGGER.debug(
+              'Running bundle for stage %s\n\tExpected outputs: %s timers: %s',
+              bundle_context_manager.stage.name,
+              bundle_context_manager.stage_data_outputs,
+              bundle_context_manager.stage_timer_outputs)
+          assert consuming_stage_name == bundle_context_manager.stage.name
+
+          bundle_counter += 1
+          bundle_results = self._execute_bundle(
+              runner_execution_context, bundle_context_manager, bundle_input)
+
+          if consuming_stage_name in monitoring_infos_by_stage:
+            monitoring_infos_by_stage[
+                consuming_stage_name] = consolidate_monitoring_infos(
+                    itertools.chain(
+                        bundle_results.process_bundle.monitoring_infos,
+                        monitoring_infos_by_stage[consuming_stage_name]))
+          else:
+            assert isinstance(
+                bundle_results.process_bundle.monitoring_infos, Iterable)
+            monitoring_infos_by_stage[consuming_stage_name] = \
+              bundle_results.process_bundle.monitoring_infos
+
+          if '' not in monitoring_infos_by_stage:

Review comment:
       we're keeping this no-stage monitoring infos for whole-pipeline metrics

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -363,68 +364,164 @@ def run_stages(self,
             self.NUM_FUSED_STAGES_COUNTER,
             urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update(
                 len(stages))
-    monitoring_infos_by_stage = {}
+    monitoring_infos_by_stage: MutableMapping[
+        str, Iterable['metrics_pb2.MonitoringInfo']] = {}
 
     runner_execution_context = execution.FnApiRunnerExecutionContext(
         stages,
         worker_handler_manager,
         stage_context.components,
         stage_context.safe_coders,
-        stage_context.data_channel_coders)
+        stage_context.data_channel_coders,
+        self._num_workers)
 
     try:
       with self.maybe_profile():
-        for stage in stages:
-          bundle_context_manager = execution.BundleContextManager(
-              runner_execution_context, stage, self._num_workers)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong watermark for %s. Expected %s, but got %s.' % (
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name),
-                  timestamp.MAX_TIMESTAMP,
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name
-                  ).input_watermark()
-              )
-          )
-
-          stage_results = self._run_stage(
-              runner_execution_context, bundle_context_manager)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong input watermark for %s. Expected %s, but got %s.' % (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name),
-              timestamp.MAX_TIMESTAMP,
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).output_watermark())
-          )
-
-          monitoring_infos_by_stage[stage.name] = (
-              list(stage_results.process_bundle.monitoring_infos))
-
-      monitoring_infos_by_stage[''] = list(
-          pipeline_metrics.to_runner_api_monitoring_infos('').values())
+        # Initialize Runner context:
+        # - Pipeline dictionaries, initial inputs and pipeline triggers
+        # - Replace Data API endpoints in protobufs.
+        runner_execution_context.setup()
+
+        bundle_counter = 0
+        # Start executing all ready bundles.
+        while len(runner_execution_context.queues.ready_inputs) > 0:

Review comment:
       this should not happen (thus why we have the assertions in lines 439-447) - it is possible to break prematurely if there's a correctness issue, but if not, then we should continue to process data until the watermarks advance to infinity.
   
   for streaming, we'll need to change this logic slightly to also wait for walltime-pending inputs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org