You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/12/16 17:03:18 UTC
[beam] branch master updated: Fix concat source fraction_consumed()
in degenerate case.
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cf20794 Fix concat source fraction_consumed() in degenerate case.
new 8f79065 Merge pull request #13563 from robertwb/concat-isdone
cf20794 is described below
commit cf2079447391f8333134f981a3f912be073bf3a2
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Dec 15 17:58:39 2020 -0800
Fix concat source fraction_consumed() in degenerate case.
---
sdks/python/apache_beam/io/concat_source.py | 9 ++++++---
sdks/python/apache_beam/io/concat_source_test.py | 8 ++++++++
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 4f799a9..e491f86 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -229,9 +229,12 @@ class ConcatRangeTracker(iobase.RangeTracker):
def fraction_consumed(self):
with self._lock:
- return self.local_to_global(
- self._claimed_source_ix,
- self.sub_range_tracker(self._claimed_source_ix).fraction_consumed())
+ if self._claimed_source_ix == len(self._source_bundles):
+ return 1.0
+ else:
+ return self.local_to_global(
+ self._claimed_source_ix,
+ self.sub_range_tracker(self._claimed_source_ix).fraction_consumed())
def local_to_global(self, source_ix, source_frac):
cw = self._cumulative_weights
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index b796ff3..622e8e9 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -135,6 +135,14 @@ class ConcatSourceTest(unittest.TestCase):
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+ def test_fraction_consumed_at_end(self):
+ source = ConcatSource([
+ RangeSource(0, 2),
+ RangeSource(2, 4),
+ ])
+ range_tracker = source.get_range_tracker((2, None), None)
+ self.assertEqual(range_tracker.fraction_consumed(), 1.0)
+
def test_estimate_size(self):
source = ConcatSource([
RangeSource(0, 10),