You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/17 18:42:49 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #12275: [BEAM-10420] Add support for per window invocation of beam:transform:sdf_process_sized_element_and_restrictions:v1

boyuanzz commented on a change in pull request #12275:
URL: https://github.com/apache/beam/pull/12275#discussion_r456578334



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1301,18 +1302,22 @@ def __repr__(self):
   @property
   def completed_work(self):
     # type: () -> float
-    if self._completed:
+    if self._completed is not None:
       return self._completed
-    elif self._remaining and self._fraction:
+    elif self._remaining is not None and self._fraction:

Review comment:
       You may also want to check `self._fraction is not None`.

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -735,7 +733,44 @@ def invoke_process(self,
     else:
       self._invoke_process_per_window(
           windowed_value, additional_args, additional_kwargs)
-    return None
+    return residuals
+
+  def _should_process_window_for_sdf(
+      self,
+      windowed_value, # type: WindowedValue
+      additional_kwargs,
+      window_index=None, # type: Optional[int]
+  ):
+    restriction_tracker = self.invoke_create_tracker(self.restriction)

Review comment:
       Would it be better to check whether `False` should be returned first then create `restriction_tracker` and `watermark_estimator`?
   
   And the function itself looks kind of ambiguous to me. The function is used no matter whether the window is observed to prepare the params for processing sdf. Is it possible to separate the check and creating params part?

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1301,18 +1302,22 @@ def __repr__(self):
   @property
   def completed_work(self):
     # type: () -> float
-    if self._completed:
+    if self._completed is not None:
       return self._completed
-    elif self._remaining and self._fraction:
+    elif self._remaining is not None and self._fraction:
       return self._remaining * self._fraction / (1 - self._fraction)
+    else:
+      return self._fraction
 
   @property
   def remaining_work(self):
     # type: () -> float
-    if self._remaining:
+    if self._remaining is not None:
       return self._remaining
-    elif self._completed:
+    elif self._completed is not None and self._fraction:

Review comment:
       Same above.

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -685,46 +689,40 @@ def invoke_process(self,
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
 
+    residuals = []
     if self.is_splittable:
-      restriction_tracker = self.invoke_create_tracker(restriction)
-      watermark_estimator = self.invoke_create_watermark_estimator(
-          watermark_estimator_state)
-
-      if len(windowed_value.windows) > 1 and self.has_windowed_inputs:
-        # Should never get here due to window explosion in
-        # the upstream pair-with-restriction.
-        raise NotImplementedError(
-            'SDFs in multiply-windowed values with windowed arguments.')
       with self.splitting_lock:
-        self.threadsafe_restriction_tracker = ThreadsafeRestrictionTracker(
-            restriction_tracker)
         self.current_windowed_value = windowed_value
-        self.threadsafe_watermark_estimator = (
-            ThreadsafeWatermarkEstimator(watermark_estimator))
-
-      restriction_tracker_param = (
-          self.signature.process_method.restriction_provider_arg_name)
-      if not restriction_tracker_param:
-        raise ValueError(
-            'DoFn is splittable but DoFn does not have a '
-            'RestrictionTrackerParam defined')
-      additional_kwargs[restriction_tracker_param] = (
-          RestrictionTrackerView(self.threadsafe_restriction_tracker))
-      watermark_param = (
-          self.signature.process_method.watermark_estimator_provider_arg_name)
-      # When the watermark_estimator is a NoOpWatermarkEstimator, the system
-      # will not add watermark_param into the DoFn param list.
-      if watermark_param is not None:
-        additional_kwargs[watermark_param] = self.threadsafe_watermark_estimator
+        self.restriction = restriction
+        self.watermark_estimator_state = watermark_estimator_state
       try:
-        return self._invoke_process_per_window(
-            windowed_value, additional_args, additional_kwargs)
+        if self.has_windowed_inputs and len(windowed_value.windows) != 1:

Review comment:
       `len(windowed_value.windows) > 1` ?

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -845,52 +880,241 @@ def _invoke_process_per_window(self,
             deferred_timestamp=deferred_timestamp)
     return None
 
+  @staticmethod
+  def _try_split(fraction,
+      window_index, # type: Optional[int]
+      stop_window_index, # type: Optional[int]
+      windowed_value, # type: WindowedValue
+      restriction,
+      watermark_estimator_state,
+      restriction_provider, # type: RestrictionProvider
+      restriction_tracker, # type: RestrictionTracker
+      watermark_estimator, # type: WatermarkEstimator
+                 ):
+    # type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary], Iterable[SplitResultResidual], Optional[int]]]
+
+    """Try to split returning a primaries, residuals and a new stop index.
+
+    For non-window observing splittable DoFns we split the current restriction
+    and assign the primary and residual to all the windows.
+
+    For window observing splittable DoFns, we:
+    1) return a split at a window boundary if the fraction lies outside of the
+       current window.
+    2) attempt to split the current restriction, if successful then return
+       the primary and residual for the current window and an additional
+       primary and residual for any fully processed and fully unprocessed
+       windows.
+    3) fall back to returning a split at the window boundary if possible
+
+    Args:
+      window_index: the current index of the window being processed or None
+                    if the splittable DoFn is not window observing.
+      stop_window_index: the current index to stop processing at or None
+                         if the splittable DoFn is not window observing.
+      windowed_value: the current windowed value
+      restriction: the initial restriction when processing was started.
+      watermark_estimator_state: the initial watermark estimator state when
+                                 processing was started.
+      restriction_provider: the DoFn's restriction provider
+      restriction_tracker: the current restriction tracker
+      watermark_estimator: the current watermark estimator
+
+    Returns:
+      A tuple containing (primaries, residuals, new_stop_index) or None if
+      splitting was not possible. new_stop_index will only be set if the
+      splittable DoFn is window observing otherwise it will be None.
+    """
+    def compute_whole_window_split(to_index, from_index):
+      restriction_size = restriction_provider.restriction_size(
+          windowed_value, restriction)
+      # The primary and residual both share the same value only differing
+      # by the set of windows they are in.
+      value = ((windowed_value.value, (restriction, watermark_estimator_state)),
+               restriction_size)
+      primary_restriction = SplitResultPrimary(
+          primary_value=WindowedValue(
+              value,
+              windowed_value.timestamp,
+              windowed_value.windows[:to_index])) if to_index > 0 else None
+      # Don't report any updated watermarks for the residual since they have
+      # not processed any part of the restriction.
+      residual_restriction = SplitResultResidual(
+          residual_value=WindowedValue(
+              value,
+              windowed_value.timestamp,
+              windowed_value.windows[from_index:stop_window_index]),
+          current_watermark=None,
+          deferred_timestamp=None) if from_index < stop_window_index else None
+      return (primary_restriction, residual_restriction)
+
+    primary_restrictions = []
+    residual_restrictions = []
+
+    window_observing = window_index is not None
+    # If we are processing each window separately and we aren't on the last
+    # window then compute whether the split lies within the current window
+    # or a future window.
+    if window_observing and window_index != stop_window_index - 1:
+      progress = restriction_tracker.current_progress()
+      if not progress:
+        # Assume no work has been completed for the current window if progress
+        # is unavailable.
+        from apache_beam.io.iobase import RestrictionProgress
+        progress = RestrictionProgress(completed=0, remaining=1)
+
+      scaled_progress = PerWindowInvoker._scale_progress(
+          progress, window_index, stop_window_index)
+      # Compute the fraction of the remainder relative to the scaled progress.
+      # If the value is greater than or equal to progress.remaining_work then we
+      # should split at the closest window boundary.
+      fraction_of_remainder = scaled_progress.remaining_work * fraction
+      if fraction_of_remainder >= progress.remaining_work:
+        # The fraction is outside of the current window and hence we will
+        # split at the closest window boundary. Favor a split and return the
+        # last window if we would have rounded up to the end of the window
+        # based upon the fraction.
+        new_stop_window_index = min(
+            stop_window_index - 1,
+            window_index + max(
+                1,
+                int(
+                    round((
+                        progress.completed_work +
+                        scaled_progress.remaining_work * fraction) /
+                          progress.total_work))))
+        primary, residual = compute_whole_window_split(
+            new_stop_window_index, new_stop_window_index)
+        assert primary is not None
+        assert residual is not None
+        return ([primary], [residual], new_stop_window_index)
+      else:
+        # The fraction is within the current window being processed so compute
+        # the updated fraction based upon the number of windows being processed.
+        new_stop_window_index = window_index + 1
+        fraction = fraction_of_remainder / progress.remaining_work
+        # Attempt to split below, if we can't then we'll compute a split
+        # using only window boundaries
+    else:
+      # We aren't splitting within multiple windows so we don't change our
+      # stop index.
+      new_stop_window_index = stop_window_index
+
+    # Temporary workaround for [BEAM-7473]: get current_watermark before
+    # split, in case watermark gets advanced before getting split results.
+    # In worst case, current_watermark is always stale, which is ok.
+    current_watermark = (watermark_estimator.current_watermark())
+    current_estimator_state = (watermark_estimator.get_estimator_state())
+    split = restriction_tracker.try_split(fraction)
+    if split:
+      primary, residual = split
+      element = windowed_value.value
+      primary_size = restriction_provider.restriction_size(
+          windowed_value.value, primary)
+      residual_size = restriction_provider.restriction_size(
+          windowed_value.value, residual)
+      # We use the watermark estimator state for the original process call
+      # for the primary and the updated watermark estimator state for the
+      # residual for the split.
+      primary_split_value = ((element, (primary, watermark_estimator_state)),
+                             primary_size)
+      residual_split_value = ((element, (residual, current_estimator_state)),
+                              residual_size)
+      windows = (
+          windowed_value.windows[window_index],
+      ) if window_observing else windowed_value.windows
+      primary_restrictions.append(
+          SplitResultPrimary(
+              primary_value=WindowedValue(
+                  primary_split_value, windowed_value.timestamp, windows)))
+      residual_restrictions.append(
+          SplitResultResidual(
+              residual_value=WindowedValue(
+                  residual_split_value, windowed_value.timestamp, windows),
+              current_watermark=current_watermark,
+              deferred_timestamp=None))
+
+      if window_observing:
+        assert new_stop_window_index == window_index + 1
+        primary, residual = compute_whole_window_split(
+            window_index, window_index + 1)
+        if primary:
+          primary_restrictions.append(primary)
+        if residual:
+          residual_restrictions.append(residual)
+      return (
+          primary_restrictions, residual_restrictions, new_stop_window_index)
+    elif new_stop_window_index and new_stop_window_index != stop_window_index:
+      # If we failed to split but have a new stop index then return a split
+      # at the window boundary.
+      primary, residual = compute_whole_window_split(
+          new_stop_window_index, new_stop_window_index)
+      assert primary is not None
+      assert residual is not None
+      return ([primary], [residual], new_stop_window_index)
+    else:
+      return None
+
   def try_split(self, fraction):
-    # type: (...) -> Optional[Tuple[SplitResultPrimary, SplitResultResidual]]
+    # type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary], Iterable[SplitResultResidual]]]
     if not self.is_splittable:
       return None
 
     with self.splitting_lock:
+      if not self.threadsafe_restriction_tracker:
+        return None
+
       # Make a local reference to member variables that change references during
       # processing under lock before attempting to split so we have a consistent
       # view of all the references.
-      current_windowed_value = self.current_windowed_value
-      threadsafe_restriction_tracker = self.threadsafe_restriction_tracker
-      threadsafe_watermark_estimator = self.threadsafe_watermark_estimator
-
-    if threadsafe_restriction_tracker:
-      # Temporary workaround for [BEAM-7473]: get current_watermark before
-      # split, in case watermark gets advanced before getting split results.
-      # In worst case, current_watermark is always stale, which is ok.
-      current_watermark = (threadsafe_watermark_estimator.current_watermark())
-      estimator_state = (threadsafe_watermark_estimator.get_estimator_state())
-      split = threadsafe_restriction_tracker.try_split(fraction)
-      if split:
-        primary, residual = split
-        element = current_windowed_value.value
-        restriction_provider = self.signature.get_restriction_provider()
-        primary_size = restriction_provider.restriction_size(element, primary)
-        residual_size = restriction_provider.restriction_size(element, residual)
-        primary_value = ((element, (primary, None)), primary_size)
-        residual_value = ((element, (residual, estimator_state)), residual_size)
-        return (
-            SplitResultPrimary(
-                primary_value=current_windowed_value.with_value(primary_value)),
-            SplitResultResidual(
-                residual_value=current_windowed_value.with_value(
-                    residual_value),
-                current_watermark=current_watermark,
-                deferred_timestamp=None))
-    return None
+      result = PerWindowInvoker._try_split(
+          fraction,
+          self.current_window_index,
+          self.stop_window_index,
+          self.current_windowed_value,
+          self.restriction,
+          self.watermark_estimator_state,
+          self.signature.get_restriction_provider(),
+          self.threadsafe_restriction_tracker,
+          self.threadsafe_watermark_estimator)
+      if not result:
+        return None
+
+      residuals, primaries, self.stop_window_index = result
+      return (residuals, primaries)
+
+  @staticmethod
+  def _scale_progress(progress, window_index, stop_window_index):
+    # We scale progress based upon the amount of work we will do for one
+    # window and have it apply for all windows.
+    completed = window_index * progress.total_work + progress.completed_work
+    remaining = (
+        stop_window_index -
+        (window_index + 1)) * progress.total_work + progress.remaining_work
+    from apache_beam.io.iobase import RestrictionProgress
+    return RestrictionProgress(completed=completed, remaining=remaining)
 
   def current_element_progress(self):
     # type: () -> Optional[RestrictionProgress]
-    restriction_tracker = self.threadsafe_restriction_tracker
-    if restriction_tracker:
-      return restriction_tracker.current_progress()
-    else:
+    if not self.is_splittable:
       return None
 
+    with self.splitting_lock:
+      current_window_index = self.current_window_index
+      stop_window_index = self.stop_window_index
+      threadsafe_restriction_tracker = self.threadsafe_restriction_tracker
+
+    if not threadsafe_restriction_tracker:
+      return None
+
+    progress = threadsafe_restriction_tracker.current_progress()
+    if not current_window_index or not progress:
+      return progress
+
+    assert stop_window_index

Review comment:
       Can we add a comment here to explain why `stop_window` should not be `None`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org