You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/10 23:26:32 UTC

[1/2] beam git commit: Properly handle side input exception when all reader threads complete

Repository: beam
Updated Branches:
  refs/heads/master a0eb00e73 -> 51c938d4e


Properly handle side input exception when all reader threads complete


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6779b8ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6779b8ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6779b8ec

Branch: refs/heads/master
Commit: 6779b8ec0e872de86ed13fdfc9b260f69f44dfab
Parents: a0eb00e
Author: Charles Chen <cc...@google.com>
Authored: Fri Nov 10 11:28:43 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Nov 10 15:21:15 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py      |  3 +++
 .../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e..c91fe95 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object):
       self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL)
 
   def __iter__(self):
+    # pylint: disable=too-many-nested-blocks
     if self.already_iterated:
       raise RuntimeError(
           'Can only iterate once over PrefetchingSourceSetIterable instance.')
@@ -132,6 +133,8 @@ class PrefetchingSourceSetIterable(object):
         if element is READER_THREAD_IS_DONE_SENTINEL:
           num_readers_finished += 1
           if num_readers_finished == self.num_reader_threads:
+            if self.has_errored:
+              raise self.reader_exceptions.get()
             return
         elif self.has_errored:
           raise self.reader_exceptions.get()

http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index d243bbe..73d34fb 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
         sources, max_reader_threads=1)
     assert list(strip_windows(iterator_fn())) == range(11)
 
+  def test_source_iterator_single_source_exception(self):
+    class MyException(Exception):
+      pass
+
+    def exception_generator():
+      yield 0
+      raise MyException('I am an exception!')
+
+    sources = [
+        FakeSource(exception_generator()),
+    ]
+    iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+    seen = set()
+    with self.assertRaises(MyException):
+      for value in iterator_fn():
+        seen.add(value.value)
+    self.assertEqual(sorted(seen), [0])
+
   def test_source_iterator_fn_exception(self):
     class MyException(Exception):
       pass


[2/2] beam git commit: This closes #4112

Posted by ch...@apache.org.
This closes #4112


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51c938d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51c938d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51c938d4

Branch: refs/heads/master
Commit: 51c938d4e32dcc3b2c6b4e847f481e3e00d5e4b0
Parents: a0eb00e 6779b8e
Author: chamikara@google.com <ch...@google.com>
Authored: Fri Nov 10 15:26:05 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Nov 10 15:26:05 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py      |  3 +++
 .../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------