You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/13 21:16:05 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11653: [BEAM-9935] Respect allowed split points in Python.

lostluck commented on a change in pull request #11653:
URL: https://github.com/apache/beam/pull/11653#discussion_r424735747



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -237,30 +235,72 @@ def try_split(self, fraction_of_remainder, total_buffer_size):
           current_element_progress = (
               current_element_progress_object.fraction_completed)
       # Now figure out where to split.
-      # The units here (except for keep_of_element_remainder) are all in
-      # terms of number of (possibly fractional) elements.
-      remainder = total_buffer_size - self.index - current_element_progress
-      keep = remainder * fraction_of_remainder
-      if current_element_progress < 1:
-        keep_of_element_remainder = keep / (1 - current_element_progress)
-        # If it's less than what's left of the current element,
-        # try splitting at the current element.
-        if keep_of_element_remainder < 1:
-          split = self.receivers[0].try_split(
-              keep_of_element_remainder
-          )  # type: Optional[Tuple[operations.SdfSplitResultsPrimary, operations.SdfSplitResultsResidual]]
-          if split:
-            element_primary, element_residual = split
-            self.stop = self.index + 1
-            return self.index - 1, element_primary, element_residual, self.stop
-      # Otherwise, split at the closest element boundary.
-      # pylint: disable=round-builtin
-      stop_index = (
-          self.index + max(1, int(round(current_element_progress + keep))))
-      if stop_index < self.stop:
-        self.stop = stop_index
-        return self.stop - 1, None, None, self.stop
-    return None
+      split = self._compute_split(
+          self.index,
+          current_element_progress,
+          self.stop,
+          fraction_of_remainder,
+          total_buffer_size,
+          allowed_split_points,
+          self.receivers[0].try_split)
+      if split:
+        self.stop = split[-1]
+      return split
+
+  @staticmethod
+  def _compute_split(
+      index,
+      current_element_progress,
+      stop,
+      fraction_of_remainder,
+      total_buffer_size,

Review comment:
       The part I missed was that this is provided by the runner as part of the DesiredSplit, rather than something computed by the SDK itself. I didn't see that change. This all makes sense. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org