You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:07 UTC
[16/50] [abbrv] beam git commit: Fix flakiness in sideinputs_test
Fix flakiness in sideinputs_test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebeb3fd9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebeb3fd9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebeb3fd9
Branch: refs/heads/tez-runner
Commit: ebeb3fd998c1a6a8ce4f9514d54d9e28a6619aa0
Parents: 2df25db
Author: Charles Chen <cc...@google.com>
Authored: Mon Nov 13 11:41:47 2017 -0800
Committer: Charles Chen <cc...@google.com>
Committed: Mon Nov 13 11:56:07 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/worker/sideinputs.py | 22 ++++++++++----------
.../runners/worker/sideinputs_test.py | 1 +
2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index c91fe95..6c7831d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -129,17 +129,17 @@ class PrefetchingSourceSetIterable(object):
num_readers_finished = 0
try:
while True:
- element = self.element_queue.get()
- if element is READER_THREAD_IS_DONE_SENTINEL:
- num_readers_finished += 1
- if num_readers_finished == self.num_reader_threads:
- if self.has_errored:
- raise self.reader_exceptions.get()
- return
- elif self.has_errored:
- raise self.reader_exceptions.get()
- else:
- yield element
+ try:
+ element = self.element_queue.get()
+ if element is READER_THREAD_IS_DONE_SENTINEL:
+ num_readers_finished += 1
+ if num_readers_finished == self.num_reader_threads:
+ return
+ else:
+ yield element
+ finally:
+ if self.has_errored:
+ raise self.reader_exceptions.get()
except GeneratorExit:
self.has_errored = True
raise
http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 73d34fb..bb688dd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -121,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
def perpetual_generator(value):
while True:
yield value
+ time.sleep(0.1)
sources = [
FakeSource(perpetual_generator(1)),