You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/08 17:49:10 UTC

[beam] branch master updated: SDF bounded wrapper returns None when any exception happen in the calculation.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 961789e  SDF bounded wrapper returns None when any exception happen in the calculation.
     new dada0f9  Merge pull request #14439 from boyuanzz/fix_py
961789e is described below

commit 961789eded3fad003f8b8d5b3d16d88892d33a40
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Apr 5 18:53:21 2021 -0700

    SDF bounded wrapper returns None when any exception happen in the calculation.
---
 sdks/python/apache_beam/io/iobase.py      | 60 +++++++++++++++++--------------
 sdks/python/apache_beam/io/iobase_test.py | 13 +++++++
 2 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 521da25..71d8037 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1499,33 +1499,39 @@ class _SDFBoundedSourceRestriction(object):
     return self._source_bundle.source
 
   def try_split(self, fraction_of_remainder):
-    consumed_fraction = self.range_tracker().fraction_consumed()
-    fraction = (
-        consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
-    position = self.range_tracker().position_at_fraction(fraction)
-    # Need to stash current stop_pos before splitting since
-    # range_tracker.split will update its stop_pos if splits
-    # successfully.
-    stop_pos = self._source_bundle.stop_position
-    split_result = self.range_tracker().try_split(position)
-    if split_result:
-      split_pos, split_fraction = split_result
-      primary_weight = self._source_bundle.weight * split_fraction
-      residual_weight = self._source_bundle.weight - primary_weight
-      # Update self to primary weight and end position.
-      self._source_bundle = SourceBundle(
-          primary_weight,
-          self._source_bundle.source,
-          self._source_bundle.start_position,
-          split_pos)
-      return (
-          self,
-          _SDFBoundedSourceRestriction(
-              SourceBundle(
-                  residual_weight,
-                  self._source_bundle.source,
-                  split_pos,
-                  stop_pos)))
+    try:
+      consumed_fraction = self.range_tracker().fraction_consumed()
+      fraction = (
+          consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+      position = self.range_tracker().position_at_fraction(fraction)
+      # Need to stash current stop_pos before splitting since
+      # range_tracker.split will update its stop_pos if splits
+      # successfully.
+      stop_pos = self._source_bundle.stop_position
+      split_result = self.range_tracker().try_split(position)
+      if split_result:
+        split_pos, split_fraction = split_result
+        primary_weight = self._source_bundle.weight * split_fraction
+        residual_weight = self._source_bundle.weight - primary_weight
+        # Update self to primary weight and end position.
+        self._source_bundle = SourceBundle(
+            primary_weight,
+            self._source_bundle.source,
+            self._source_bundle.start_position,
+            split_pos)
+        return (
+            self,
+            _SDFBoundedSourceRestriction(
+                SourceBundle(
+                    residual_weight,
+                    self._source_bundle.source,
+                    split_pos,
+                    stop_pos)))
+    except Exception:
+      # For any exceptions from underlying trySplit calls, the wrapper will
+      # think that the source refuses to split at this point. In this case,
+      # no split happens at the wrapper level.
+      return None
 
 
 class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py
index 303cb68..bde0566 100644
--- a/sdks/python/apache_beam/io/iobase_test.py
+++ b/sdks/python/apache_beam/io/iobase_test.py
@@ -27,6 +27,7 @@ import apache_beam as beam
 from apache_beam.io.concat_source import ConcatSource
 from apache_beam.io.concat_source_test import RangeSource
 from apache_beam.io import iobase
+from apache_beam.io import range_trackers
 from apache_beam.io.iobase import SourceBundle
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.testing.util import assert_that
@@ -181,6 +182,18 @@ class SDFBoundedSourceRestrictionTrackerTest(unittest.TestCase):
         actual_primary._source_bundle.weight,
         self.sdf_restriction_tracker.current_restriction().weight())
 
+  def test_try_split_with_any_exception(self):
+    source_bundle = SourceBundle(
+        range_trackers.OffsetRangeTracker.OFFSET_INFINITY,
+        RangeSource(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY),
+        0,
+        range_trackers.OffsetRangeTracker.OFFSET_INFINITY)
+    self.sdf_restriction_tracker = (
+        iobase._SDFBoundedSourceRestrictionTracker(
+            iobase._SDFBoundedSourceRestriction(source_bundle)))
+    self.sdf_restriction_tracker.try_claim(0)
+    self.assertIsNone(self.sdf_restriction_tracker.try_split(0.5))
+
 
 class UseSdfBoundedSourcesTests(unittest.TestCase):
   def _run_sdf_wrapper_pipeline(self, source, expected_values):