You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/13 20:47:27 UTC

[beam] branch release-2.21.0 updated: [BEAM-9945] Ensure that the read index represents the number of fully processed elements including at the end of the channel or after splitting.

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new f0d1453  [BEAM-9945] Ensure that the read index represents the number of fully processed elements including at the end of the channel or after splitting.
     new 8f387f0  Merge pull request #11697 from ibzib/BEAM-9945
f0d1453 is described below

commit f0d1453c7d1fe87780c71b9b2ecdf85724faa9f1
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Wed May 13 15:57:14 2020 -0400

    [BEAM-9945] Ensure that the read index represents the number of fully processed elements including at the end of the channel or after splitting.
---
 sdks/python/apache_beam/runners/worker/bundle_processor.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index aca7bff..e4098ef 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -191,14 +191,14 @@ class DataInputOperation(RunnerIOOperation):
             self.windowed_coder)
     ]
     self.splitting_lock = threading.Lock()
+    self.index = -1
+    self.stop = float('inf')
     self.started = False
 
   def start(self):
     # type: () -> None
     super(DataInputOperation, self).start()
     with self.splitting_lock:
-      self.index = -1
-      self.stop = float('inf')
       self.started = True
 
   def process(self, windowed_value):
@@ -317,8 +317,15 @@ class DataInputOperation(RunnerIOOperation):
   def finish(self):
     # type: () -> None
     with self.splitting_lock:
+      self.index += 1
       self.started = False
 
+  def reset(self):
+    # type: () -> None
+    self.index = -1
+    self.stop = float('inf')
+    super(DataInputOperation, self).reset()
+
 
 class _StateBackedIterable(object):
   def __init__(self,