You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/12/16 17:03:18 UTC

[beam] branch master updated: Fix concat source fraction_consumed() in degenerate case.

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cf20794  Fix concat source fraction_consumed() in degenerate case.
     new 8f79065  Merge pull request #13563 from robertwb/concat-isdone
cf20794 is described below

commit cf2079447391f8333134f981a3f912be073bf3a2
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Dec 15 17:58:39 2020 -0800

    Fix concat source fraction_consumed() in degenerate case.
---
 sdks/python/apache_beam/io/concat_source.py      | 9 ++++++---
 sdks/python/apache_beam/io/concat_source_test.py | 8 ++++++++
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 4f799a9..e491f86 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -229,9 +229,12 @@ class ConcatRangeTracker(iobase.RangeTracker):
 
   def fraction_consumed(self):
     with self._lock:
-      return self.local_to_global(
-          self._claimed_source_ix,
-          self.sub_range_tracker(self._claimed_source_ix).fraction_consumed())
+      if self._claimed_source_ix == len(self._source_bundles):
+        return 1.0
+      else:
+        return self.local_to_global(
+            self._claimed_source_ix,
+            self.sub_range_tracker(self._claimed_source_ix).fraction_consumed())
 
   def local_to_global(self, source_ix, source_frac):
     cw = self._cumulative_weights
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index b796ff3..622e8e9 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -135,6 +135,14 @@ class ConcatSourceTest(unittest.TestCase):
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
 
+  def test_fraction_consumed_at_end(self):
+    source = ConcatSource([
+        RangeSource(0, 2),
+        RangeSource(2, 4),
+    ])
+    range_tracker = source.get_range_tracker((2, None), None)
+    self.assertEqual(range_tracker.fraction_consumed(), 1.0)
+
   def test_estimate_size(self):
     source = ConcatSource([
         RangeSource(0, 10),