You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/04 23:43:08 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15441: [BEAM-8823] Make FnApiRunner work by executing ready elements instead of stages

pabloem commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r721788623



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,106 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload != translations.IMPULSE_BUFFER:

Review comment:
       done

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -76,16 +82,10 @@
   from apache_beam.runners.portability.fn_api_runner.worker_handlers import WorkerHandler
 
 _LOGGER = logging.getLogger(__name__)
+# _LOGGER.setLevel('DEBUG')

Review comment:
       done

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -586,13 +718,26 @@ def __init__(self,
     self.pipeline_components = pipeline_components
     self.safe_coders = safe_coders
     self.data_channel_coders = data_channel_coders
+    self.num_workers = num_workers
+    # TODO(pabloem): Move Clock classes out of DirectRunner and into FnApiRnr
+    self.clock: Union[TestClock, RealClock] = (
+        TestClock() if uses_teststream else RealClock())
+    self.queues = _ProcessingQueueManager()
+
+    # The following set of dictionaries hold information mapping relationships
+    # between various pipeline elements.
+    self.input_transform_to_buffer_id: MutableMapping[str, bytes] = {}
+    self.pcollection_to_producer_transform: MutableMapping[Union[str, bytes],
+                                                           Optional[str]] = {}
+    # Map of buffer_id to its consumers. A consumer is the pair of
+    # Stage name + Ptransform name that consume that buffer.
+    self.buffer_id_to_consumer_pairs: Dict[bytes, Set[Tuple[str, str]]] = {}
+    self._compute_pipeline_dictionaries()
 
-    self.input_transform_to_buffer_id = {
-        t.unique_name: t.spec.payload
-        for s in stages for t in s.transforms
-        if t.spec.urn == bundle_processor.DATA_INPUT_URN
-    }
     self.watermark_manager = WatermarkManager(stages)
+    # from apache_beam.runners.portability.fn_api_runner import \

Review comment:
       I'd like to keep this here as a hint to show that the pipeline can be visualized here.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -111,15 +133,28 @@ def reset(self):
     pass
 
 
-class ListBuffer(object):
+class ListBuffer:
   """Used to support parititioning of a list."""
   def __init__(self, coder_impl):
-    # type: (CoderImpl) -> None
-    self._coder_impl = coder_impl
+    # type: (Optional[CoderImpl]) -> None
+    self._coder_impl = coder_impl or CoderImpl()

Review comment:
       it's to guarantee the type will be `CoderImpl` for `self._coder_impl` and pass typing tests

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -471,9 +559,17 @@ def _collect_written_timers(
             timer_watermark_data[(transform_id, timer_family_id)] = min(
                 timer_watermark_data[(transform_id, timer_family_id)],
                 decoded_timer.hold_timestamp)
-        newly_set_timers[(transform_id, timer_family_id)] = ListBuffer(
-            coder_impl=timer_coder_impl)
-        newly_set_timers[(transform_id, timer_family_id)].append(out.get())
+          else:
+            # Timer was cleared, so we must skip setting it below.
+            timer_cleared = True
+            continue
+        if timer_cleared or (transform_id,

Review comment:
       the SDK would only send back the latest of these events, independently of what it is. is that reasonable?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -584,100 +680,156 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
             channel_split.transform_id] = channel_split.last_primary_element
     return pcolls_with_delayed_apps, transforms_with_channel_splits
 
-  def _run_stage(self,
+  def _execute_bundle(self,
                  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
                  bundle_context_manager,  # type: execution.BundleContextManager
-                ):
-    # type: (...) -> beam_fn_api_pb2.InstructionResponse
-
-    """Run an individual stage.
+                 bundle_input: DataInput
+                ) -> beam_fn_api_pb2.InstructionResponse:
+    """Execute a bundle end-to-end.
 
     Args:
       runner_execution_context (execution.FnApiRunnerExecutionContext): An
         object containing execution information for the pipeline.
       bundle_context_manager (execution.BundleContextManager): A description of
         the stage to execute, and its context.
+      bundle_input: The set of buffers to input into this bundle
     """
-    data_input, data_output, expected_timer_output = (
-        bundle_context_manager.extract_bundle_inputs_and_outputs())
-    input_timers = {
-    }  # type: Mapping[Tuple[str, str], execution.PartitionableBuffer]
-
     worker_handler_manager = runner_execution_context.worker_handler_manager
-    _LOGGER.info('Running %s', bundle_context_manager.stage.name)
+
+    # TODO(pabloem): Should move this to be done once per stage
     worker_handler_manager.register_process_bundle_descriptor(
         bundle_context_manager.process_bundle_descriptor)
 
-    # We create the bundle manager here, as it can be reused for bundles of the
-    # same stage, but it may have to be created by-bundle later on.
+    # We create the bundle manager here, as it can be reused for bundles of
+    # the same stage, but it may have to be created by-bundle later on.
+    bundle_manager = self._get_bundle_manager(bundle_context_manager)
+
+    last_result, deferred_inputs, newly_set_timers, watermark_updates = (
+        self._run_bundle(
+            runner_execution_context,
+            bundle_context_manager,
+            bundle_input,
+            bundle_context_manager.stage_data_outputs,
+            bundle_context_manager.stage_timer_outputs,
+            bundle_manager))
+
+    for pc_name, watermark in watermark_updates.items():
+      runner_execution_context.watermark_manager.set_pcoll_watermark(
+          pc_name, watermark)
+
+    if deferred_inputs:
+      assert (runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name).output_watermark()
+              < timestamp.MAX_TIMESTAMP), (
+          'wrong timestamp for %s. '
+          % runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name))
+      runner_execution_context.queues.ready_inputs.enque(
+          (bundle_context_manager.stage.name, DataInput(deferred_inputs, {})))
+
+    self._enqueue_set_timers(
+        runner_execution_context,
+        bundle_context_manager,
+        newly_set_timers,
+        bundle_input)
+
+    # Store the required downstream side inputs into state so it is accessible
+    # for the worker when it runs bundles that consume this stage's output.
+    data_side_input = (
+        runner_execution_context.side_input_descriptors_by_stage.get(
+            bundle_context_manager.stage.name, {}))
+    runner_execution_context.commit_side_inputs_to_state(data_side_input)
+
+    buffers_to_clean = set()
+    known_consumers = set()
+    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+      for (consuming_stage_name, consuming_transform) in \
+          runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+                                                                   []):
+        buffer = runner_execution_context.pcoll_buffers.get(
+            buffer_id, ListBuffer(None))
+
+        if buffer and buffer_id in buffers_to_clean:
+          runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
+          buffer = runner_execution_context.pcoll_buffers[buffer_id]
+        if buffer_id in runner_execution_context.pcoll_buffers:
+          buffers_to_clean.add(buffer_id)

Review comment:
       I've added comments for the special cases. lmk if that makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org