You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/13 21:17:02 UTC

[1/2] beam git commit: Fix flakiness in sideinputs_test

Repository: beam
Updated Branches:
  refs/heads/master 2df25db34 -> 1b3f1c1f4


Fix flakiness in sideinputs_test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebeb3fd9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebeb3fd9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebeb3fd9

Branch: refs/heads/master
Commit: ebeb3fd998c1a6a8ce4f9514d54d9e28a6619aa0
Parents: 2df25db
Author: Charles Chen <cc...@google.com>
Authored: Mon Nov 13 11:41:47 2017 -0800
Committer: Charles Chen <cc...@google.com>
Committed: Mon Nov 13 11:56:07 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py    | 22 ++++++++++----------
 .../runners/worker/sideinputs_test.py           |  1 +
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index c91fe95..6c7831d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -129,17 +129,17 @@ class PrefetchingSourceSetIterable(object):
     num_readers_finished = 0
     try:
       while True:
-        element = self.element_queue.get()
-        if element is READER_THREAD_IS_DONE_SENTINEL:
-          num_readers_finished += 1
-          if num_readers_finished == self.num_reader_threads:
-            if self.has_errored:
-              raise self.reader_exceptions.get()
-            return
-        elif self.has_errored:
-          raise self.reader_exceptions.get()
-        else:
-          yield element
+        try:
+          element = self.element_queue.get()
+          if element is READER_THREAD_IS_DONE_SENTINEL:
+            num_readers_finished += 1
+            if num_readers_finished == self.num_reader_threads:
+              return
+          else:
+            yield element
+        finally:
+          if self.has_errored:
+            raise self.reader_exceptions.get()
     except GeneratorExit:
       self.has_errored = True
       raise

http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 73d34fb..bb688dd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -121,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
     def perpetual_generator(value):
       while True:
         yield value
+        time.sleep(0.1)
 
     sources = [
         FakeSource(perpetual_generator(1)),


[2/2] beam git commit: This closes #4122

Posted by ch...@apache.org.
This closes #4122


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b3f1c1f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b3f1c1f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b3f1c1f

Branch: refs/heads/master
Commit: 1b3f1c1f4f242514ff117d80307f5ff56c9e703a
Parents: 2df25db ebeb3fd
Author: chamikara@google.com <ch...@google.com>
Authored: Mon Nov 13 13:16:42 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Nov 13 13:16:42 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py    | 22 ++++++++++----------
 .../runners/worker/sideinputs_test.py           |  1 +
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------