You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/10 23:26:32 UTC
[1/2] beam git commit: Properly handle side input exception when all
reader threads complete
Repository: beam
Updated Branches:
refs/heads/master a0eb00e73 -> 51c938d4e
Properly handle side input exception when all reader threads complete
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6779b8ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6779b8ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6779b8ec
Branch: refs/heads/master
Commit: 6779b8ec0e872de86ed13fdfc9b260f69f44dfab
Parents: a0eb00e
Author: Charles Chen <cc...@google.com>
Authored: Fri Nov 10 11:28:43 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Nov 10 15:21:15 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/worker/sideinputs.py | 3 +++
.../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++
2 files changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e..c91fe95 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object):
self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL)
def __iter__(self):
+ # pylint: disable=too-many-nested-blocks
if self.already_iterated:
raise RuntimeError(
'Can only iterate once over PrefetchingSourceSetIterable instance.')
@@ -132,6 +133,8 @@ class PrefetchingSourceSetIterable(object):
if element is READER_THREAD_IS_DONE_SENTINEL:
num_readers_finished += 1
if num_readers_finished == self.num_reader_threads:
+ if self.has_errored:
+ raise self.reader_exceptions.get()
return
elif self.has_errored:
raise self.reader_exceptions.get()
http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index d243bbe..73d34fb 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
sources, max_reader_threads=1)
assert list(strip_windows(iterator_fn())) == range(11)
+ def test_source_iterator_single_source_exception(self):
+ class MyException(Exception):
+ pass
+
+ def exception_generator():
+ yield 0
+ raise MyException('I am an exception!')
+
+ sources = [
+ FakeSource(exception_generator()),
+ ]
+ iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+ seen = set()
+ with self.assertRaises(MyException):
+ for value in iterator_fn():
+ seen.add(value.value)
+ self.assertEqual(sorted(seen), [0])
+
def test_source_iterator_fn_exception(self):
class MyException(Exception):
pass
[2/2] beam git commit: This closes #4112
Posted by ch...@apache.org.
This closes #4112
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51c938d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51c938d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51c938d4
Branch: refs/heads/master
Commit: 51c938d4e32dcc3b2c6b4e847f481e3e00d5e4b0
Parents: a0eb00e 6779b8e
Author: chamikara@google.com <ch...@google.com>
Authored: Fri Nov 10 15:26:05 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri Nov 10 15:26:05 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/worker/sideinputs.py | 3 +++
.../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++
2 files changed, 21 insertions(+)
----------------------------------------------------------------------