You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/13 21:17:02 UTC
[1/2] beam git commit: Fix flakiness in sideinputs_test
Repository: beam
Updated Branches:
refs/heads/master 2df25db34 -> 1b3f1c1f4
Fix flakiness in sideinputs_test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebeb3fd9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebeb3fd9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebeb3fd9
Branch: refs/heads/master
Commit: ebeb3fd998c1a6a8ce4f9514d54d9e28a6619aa0
Parents: 2df25db
Author: Charles Chen <cc...@google.com>
Authored: Mon Nov 13 11:41:47 2017 -0800
Committer: Charles Chen <cc...@google.com>
Committed: Mon Nov 13 11:56:07 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/worker/sideinputs.py | 22 ++++++++++----------
.../runners/worker/sideinputs_test.py | 1 +
2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index c91fe95..6c7831d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -129,17 +129,17 @@ class PrefetchingSourceSetIterable(object):
num_readers_finished = 0
try:
while True:
- element = self.element_queue.get()
- if element is READER_THREAD_IS_DONE_SENTINEL:
- num_readers_finished += 1
- if num_readers_finished == self.num_reader_threads:
- if self.has_errored:
- raise self.reader_exceptions.get()
- return
- elif self.has_errored:
- raise self.reader_exceptions.get()
- else:
- yield element
+ try:
+ element = self.element_queue.get()
+ if element is READER_THREAD_IS_DONE_SENTINEL:
+ num_readers_finished += 1
+ if num_readers_finished == self.num_reader_threads:
+ return
+ else:
+ yield element
+ finally:
+ if self.has_errored:
+ raise self.reader_exceptions.get()
except GeneratorExit:
self.has_errored = True
raise
http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 73d34fb..bb688dd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -121,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
def perpetual_generator(value):
while True:
yield value
+ time.sleep(0.1)
sources = [
FakeSource(perpetual_generator(1)),
[2/2] beam git commit: This closes #4122
Posted by ch...@apache.org.
This closes #4122
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b3f1c1f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b3f1c1f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b3f1c1f
Branch: refs/heads/master
Commit: 1b3f1c1f4f242514ff117d80307f5ff56c9e703a
Parents: 2df25db ebeb3fd
Author: chamikara@google.com <ch...@google.com>
Authored: Mon Nov 13 13:16:42 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Nov 13 13:16:42 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/worker/sideinputs.py | 22 ++++++++++----------
.../runners/worker/sideinputs_test.py | 1 +
2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------