You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/08 17:49:10 UTC
[beam] branch master updated: SDF bounded wrapper returns None when
any exception happen in the calculation.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 961789e SDF bounded wrapper returns None when any exception happen in the calculation.
new dada0f9 Merge pull request #14439 from boyuanzz/fix_py
961789e is described below
commit 961789eded3fad003f8b8d5b3d16d88892d33a40
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Apr 5 18:53:21 2021 -0700
SDF bounded wrapper returns None when any exception happen in the calculation.
---
sdks/python/apache_beam/io/iobase.py | 60 +++++++++++++++++--------------
sdks/python/apache_beam/io/iobase_test.py | 13 +++++++
2 files changed, 46 insertions(+), 27 deletions(-)
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 521da25..71d8037 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1499,33 +1499,39 @@ class _SDFBoundedSourceRestriction(object):
return self._source_bundle.source
def try_split(self, fraction_of_remainder):
- consumed_fraction = self.range_tracker().fraction_consumed()
- fraction = (
- consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
- position = self.range_tracker().position_at_fraction(fraction)
- # Need to stash current stop_pos before splitting since
- # range_tracker.split will update its stop_pos if splits
- # successfully.
- stop_pos = self._source_bundle.stop_position
- split_result = self.range_tracker().try_split(position)
- if split_result:
- split_pos, split_fraction = split_result
- primary_weight = self._source_bundle.weight * split_fraction
- residual_weight = self._source_bundle.weight - primary_weight
- # Update self to primary weight and end position.
- self._source_bundle = SourceBundle(
- primary_weight,
- self._source_bundle.source,
- self._source_bundle.start_position,
- split_pos)
- return (
- self,
- _SDFBoundedSourceRestriction(
- SourceBundle(
- residual_weight,
- self._source_bundle.source,
- split_pos,
- stop_pos)))
+ try:
+ consumed_fraction = self.range_tracker().fraction_consumed()
+ fraction = (
+ consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+ position = self.range_tracker().position_at_fraction(fraction)
+ # Need to stash current stop_pos before splitting since
+ # range_tracker.split will update its stop_pos if splits
+ # successfully.
+ stop_pos = self._source_bundle.stop_position
+ split_result = self.range_tracker().try_split(position)
+ if split_result:
+ split_pos, split_fraction = split_result
+ primary_weight = self._source_bundle.weight * split_fraction
+ residual_weight = self._source_bundle.weight - primary_weight
+ # Update self to primary weight and end position.
+ self._source_bundle = SourceBundle(
+ primary_weight,
+ self._source_bundle.source,
+ self._source_bundle.start_position,
+ split_pos)
+ return (
+ self,
+ _SDFBoundedSourceRestriction(
+ SourceBundle(
+ residual_weight,
+ self._source_bundle.source,
+ split_pos,
+ stop_pos)))
+ except Exception:
+ # For any exceptions from underlying trySplit calls, the wrapper will
+ # think that the source refuses to split at this point. In this case,
+ # no split happens at the wrapper level.
+ return None
class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py
index 303cb68..bde0566 100644
--- a/sdks/python/apache_beam/io/iobase_test.py
+++ b/sdks/python/apache_beam/io/iobase_test.py
@@ -27,6 +27,7 @@ import apache_beam as beam
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io import iobase
+from apache_beam.io import range_trackers
from apache_beam.io.iobase import SourceBundle
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.testing.util import assert_that
@@ -181,6 +182,18 @@ class SDFBoundedSourceRestrictionTrackerTest(unittest.TestCase):
actual_primary._source_bundle.weight,
self.sdf_restriction_tracker.current_restriction().weight())
+ def test_try_split_with_any_exception(self):
+ source_bundle = SourceBundle(
+ range_trackers.OffsetRangeTracker.OFFSET_INFINITY,
+ RangeSource(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY),
+ 0,
+ range_trackers.OffsetRangeTracker.OFFSET_INFINITY)
+ self.sdf_restriction_tracker = (
+ iobase._SDFBoundedSourceRestrictionTracker(
+ iobase._SDFBoundedSourceRestriction(source_bundle)))
+ self.sdf_restriction_tracker.try_claim(0)
+ self.assertIsNone(self.sdf_restriction_tracker.try_split(0.5))
+
class UseSdfBoundedSourcesTests(unittest.TestCase):
def _run_sdf_wrapper_pipeline(self, source, expected_values):