You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/19 23:58:44 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r853599231


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1170,6 +1241,9 @@ def __init__(self,
     else:
       per_element_output_counter = None
 
+    # TODO: output processor assumes DoFns are batch-to-batch or
+    # element-to-element, @yields_batches and @yields_elements will break this
+    # assumption.

Review Comment:
   Tag the appropriate jira here



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -329,7 +432,9 @@ def finish(self):
     # type: () -> None
 
     """Finish operation."""
-    pass
+    # TODO: Do we need an output_index here

Review Comment:
   This is still an open question



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1450,78 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        # TODO: Helpful message
+        raise RuntimeError
+      if isinstance(result, WindowedBatch):
+        windowed_batch = result
+        # TODO: Should we do this in batches?
+        #       Would need to require one batch per window
+        #if (windowed_input_element is not None and
+        #    len(windowed_input_element.windows) != 1):
+        #  windowed_value.windows *= len(windowed_input_element.windows)
+      # TODO: TimestampedBatch
+      #elif isinstance(result, TimestampedValue):
+      #  assign_context = WindowFn.AssignContext(result.timestamp, result.value)
+      #  windowed_value = WindowedValue(
+      #      result.value,
+      #      result.timestamp,
+      #      self.window_fn.assign(assign_context))
+      #  if len(windowed_input_element.windows) != 1:
+      #    windowed_value.windows *= len(windowed_input_element.windows)
+      else:
+        # TODO: This should error unless the DoFn was defined with
+        # @DoFn.yields_batches(output_aligned_with_input=True)
+        # We should consider also validating that the length is the same as
+        # windowed_input_batch
+        windowed_batch = windowed_input_batch.with_values(result)
+
+      if watermark_estimator is not None:
+        for timestamp in windowed_batch.timestamps:
+          watermark_estimator.observe_timestamp(timestamp)
+      if tag is None:
+        self.main_receivers.receive_batch(windowed_batch)
+      else:
+        self.tagged_receivers[tag].receive_batch(windowed_batch)

Review Comment:
   OutputProcessor logic needs to be cleaned up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org