You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/10/01 22:41:00 UTC

[jira] [Work logged] (BEAM-8823) Make FnApiRunner work by executing ready elements instead of stages

     [ https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=659152&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659152 ]

ASF GitHub Bot logged work on BEAM-8823:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Oct/21 22:40
            Start Date: 01/Oct/21 22:40
    Worklog Time Spent: 10m 
      Work Description: y1chi commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r719811714



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -111,15 +133,28 @@ def reset(self):
     pass
 
 
-class ListBuffer(object):
+class ListBuffer:
   """Used to support parititioning of a list."""
   def __init__(self, coder_impl):
-    # type: (CoderImpl) -> None
-    self._coder_impl = coder_impl
+    # type: (Optional[CoderImpl]) -> None
+    self._coder_impl = coder_impl or CoderImpl()

Review comment:
       Why do we need this abstract CoderImpl()?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,106 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload != translations.IMPULSE_BUFFER:
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+          elif t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN
+          coder_id = self.data_channel_coders[only_element(
+              transform.inputs.values())]
+        # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the
+        # payload with the GRPC configuration for the Data channel.
+        bundle_manager = self.bundle_manager_for(stage)
+        data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+        data_api_service_descriptor = (
+            bundle_manager.data_api_service_descriptor())
+        if data_api_service_descriptor:
+          data_spec.api_service_descriptor.url = (
+              data_api_service_descriptor.url)
+        transform.spec.payload = data_spec.SerializeToString()
+      elif transform.spec.urn in translations.PAR_DO_URNS:
+        payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+        if payload.side_inputs:
+          # If the stage needs side inputs, then it's not ready to be
+          # executed.
+          ready_to_schedule = False
+    if data_input and ready_to_schedule:
+      # We push the data inputs, along with the name of the consuming stage.
+      _LOGGER.debug('Scheduling bundle in stage for execution: %s', stage.name)
+      self.queues.ready_inputs.enque((stage.name, DataInput(data_input, {})))
+    elif data_input and not ready_to_schedule:
+      _LOGGER.debug(
+          'Enqueuing stage pending watermark. Stage name: %s', stage.name)
+      self.queues.watermark_pending_inputs.enque(
+          ((stage.name, MAX_TIMESTAMP), DataInput(data_input, {})))

Review comment:
       MAX_TIMESTAMP here seems weird to me, should it be MIN_TIMESTAMP if we expect the input to be ready (maybe for streaming)?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -586,13 +718,26 @@ def __init__(self,
     self.pipeline_components = pipeline_components
     self.safe_coders = safe_coders
     self.data_channel_coders = data_channel_coders
+    self.num_workers = num_workers
+    # TODO(pabloem): Move Clock classes out of DirectRunner and into FnApiRnr
+    self.clock: Union[TestClock, RealClock] = (
+        TestClock() if uses_teststream else RealClock())
+    self.queues = _ProcessingQueueManager()
+
+    # The following set of dictionaries hold information mapping relationships
+    # between various pipeline elements.
+    self.input_transform_to_buffer_id: MutableMapping[str, bytes] = {}
+    self.pcollection_to_producer_transform: MutableMapping[Union[str, bytes],
+                                                           Optional[str]] = {}
+    # Map of buffer_id to its consumers. A consumer is the pair of
+    # Stage name + Ptransform name that consume that buffer.
+    self.buffer_id_to_consumer_pairs: Dict[bytes, Set[Tuple[str, str]]] = {}
+    self._compute_pipeline_dictionaries()
 
-    self.input_transform_to_buffer_id = {
-        t.unique_name: t.spec.payload
-        for s in stages for t in s.transforms
-        if t.spec.urn == bundle_processor.DATA_INPUT_URN
-    }
     self.watermark_manager = WatermarkManager(stages)
+    # from apache_beam.runners.portability.fn_api_runner import \

Review comment:
       Should this be removed?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,106 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload != translations.IMPULSE_BUFFER:

Review comment:
       I think it is better to just
   ```
   if t.spec.payload == translations.IMPULSE_BUFFER:
    ...
   else:
    ...
   ```

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -76,16 +82,10 @@
   from apache_beam.runners.portability.fn_api_runner.worker_handlers import WorkerHandler
 
 _LOGGER = logging.getLogger(__name__)
+# _LOGGER.setLevel('DEBUG')

Review comment:
       remove?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -471,9 +559,17 @@ def _collect_written_timers(
             timer_watermark_data[(transform_id, timer_family_id)] = min(
                 timer_watermark_data[(transform_id, timer_family_id)],
                 decoded_timer.hold_timestamp)
-        newly_set_timers[(transform_id, timer_family_id)] = ListBuffer(
-            coder_impl=timer_coder_impl)
-        newly_set_timers[(transform_id, timer_family_id)].append(out.get())
+          else:
+            # Timer was cleared, so we must skip setting it below.
+            timer_cleared = True
+            continue
+        if timer_cleared or (transform_id,

Review comment:
       what happens if same timer was set multiple times with clear being called in between?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -584,100 +680,156 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
             channel_split.transform_id] = channel_split.last_primary_element
     return pcolls_with_delayed_apps, transforms_with_channel_splits
 
-  def _run_stage(self,
+  def _execute_bundle(self,
                  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
                  bundle_context_manager,  # type: execution.BundleContextManager
-                ):
-    # type: (...) -> beam_fn_api_pb2.InstructionResponse
-
-    """Run an individual stage.
+                 bundle_input: DataInput
+                ) -> beam_fn_api_pb2.InstructionResponse:
+    """Execute a bundle end-to-end.
 
     Args:
       runner_execution_context (execution.FnApiRunnerExecutionContext): An
         object containing execution information for the pipeline.
       bundle_context_manager (execution.BundleContextManager): A description of
         the stage to execute, and its context.
+      bundle_input: The set of buffers to input into this bundle
     """
-    data_input, data_output, expected_timer_output = (
-        bundle_context_manager.extract_bundle_inputs_and_outputs())
-    input_timers = {
-    }  # type: Mapping[Tuple[str, str], execution.PartitionableBuffer]
-
     worker_handler_manager = runner_execution_context.worker_handler_manager
-    _LOGGER.info('Running %s', bundle_context_manager.stage.name)
+
+    # TODO(pabloem): Should move this to be done once per stage
     worker_handler_manager.register_process_bundle_descriptor(
         bundle_context_manager.process_bundle_descriptor)
 
-    # We create the bundle manager here, as it can be reused for bundles of the
-    # same stage, but it may have to be created by-bundle later on.
+    # We create the bundle manager here, as it can be reused for bundles of
+    # the same stage, but it may have to be created by-bundle later on.
+    bundle_manager = self._get_bundle_manager(bundle_context_manager)
+
+    last_result, deferred_inputs, newly_set_timers, watermark_updates = (
+        self._run_bundle(
+            runner_execution_context,
+            bundle_context_manager,
+            bundle_input,
+            bundle_context_manager.stage_data_outputs,
+            bundle_context_manager.stage_timer_outputs,
+            bundle_manager))
+
+    for pc_name, watermark in watermark_updates.items():
+      runner_execution_context.watermark_manager.set_pcoll_watermark(
+          pc_name, watermark)
+
+    if deferred_inputs:
+      assert (runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name).output_watermark()
+              < timestamp.MAX_TIMESTAMP), (
+          'wrong timestamp for %s. '
+          % runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name))
+      runner_execution_context.queues.ready_inputs.enque(
+          (bundle_context_manager.stage.name, DataInput(deferred_inputs, {})))
+
+    self._enqueue_set_timers(
+        runner_execution_context,
+        bundle_context_manager,
+        newly_set_timers,
+        bundle_input)
+
+    # Store the required downstream side inputs into state so it is accessible
+    # for the worker when it runs bundles that consume this stage's output.
+    data_side_input = (
+        runner_execution_context.side_input_descriptors_by_stage.get(
+            bundle_context_manager.stage.name, {}))
+    runner_execution_context.commit_side_inputs_to_state(data_side_input)
+
+    buffers_to_clean = set()
+    known_consumers = set()
+    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+      for (consuming_stage_name, consuming_transform) in \
+          runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+                                                                   []):
+        buffer = runner_execution_context.pcoll_buffers.get(
+            buffer_id, ListBuffer(None))
+
+        if buffer and buffer_id in buffers_to_clean:
+          runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
+          buffer = runner_execution_context.pcoll_buffers[buffer_id]
+        if buffer_id in runner_execution_context.pcoll_buffers:
+          buffers_to_clean.add(buffer_id)

Review comment:
       I found it a bit hard to follow the logic here. Are we just trying to pop the pcollection buffer with the buffer id?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 659152)
    Time Spent: 3h 40m  (was: 3.5h)

> Make FnApiRunner work by executing ready elements instead of stages
> -------------------------------------------------------------------
>
>                 Key: BEAM-8823
>                 URL: https://issues.apache.org/jira/browse/BEAM-8823
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Priority: P3
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)