You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/10 19:57:03 UTC
[2/2] beam git commit: [BEAM-2240] Always augment exception with step
name.
[BEAM-2240] Always augment exception with step name.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/baa17835
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/baa17835
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/baa17835
Branch: refs/heads/master
Commit: baa178359ced13ef50f8f8ba398567836bbb63a0
Parents: df1704b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 9 15:19:34 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed May 10 12:56:40 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 2 +-
sdks/python/apache_beam/runners/common.py | 24 ++++++++++++++------
.../portability/maptask_executor_runner_test.py | 19 ++++++++++++++--
3 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/baa17835/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c6b1e48..8aa8a8a 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -278,7 +278,7 @@ class PipelineTest(unittest.TestCase):
with self.assertRaises(ValueError):
with Pipeline() as p:
# pylint: disable=expression-not-assigned
- p | Create([ValueError]) | Map(raise_exception)
+ p | Create([ValueError('msg')]) | Map(raise_exception)
# TODO(BEAM-1894).
# def test_eager_pipeline(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/baa17835/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 0aef0a1..86db711 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -20,6 +20,7 @@
"""Worker operations executor."""
import sys
+import traceback
from apache_beam.internal import util
from apache_beam.metrics.execution import ScopedMetricsContainer
@@ -409,13 +410,22 @@ class DoFnRunner(Receiver):
def _reraise_augmented(self, exn):
if getattr(exn, '_tagged_with_step', False) or not self.step_name:
raise
- args = exn.args
- if args and isinstance(args[0], str):
- args = (args[0] + " [while running '%s']" % self.step_name,) + args[1:]
- # Poor man's exception chaining.
- raise type(exn), args, sys.exc_info()[2]
- else:
- raise
+ step_annotation = " [while running '%s']" % self.step_name
+ # To emulate exception chaining (not available in Python 2).
+ original_traceback = sys.exc_info()[2]
+ try:
+ # Attempt to construct the same kind of exception
+ # with an augmented message.
+ new_exn = type(exn)(exn.args[0] + step_annotation, *exn.args[1:])
+ new_exn._tagged_with_step = True # Could raise attribute error.
+ except: # pylint: disable=bare-except
+ # If anything goes wrong, construct a RuntimeError whose message
+ # records the original exception's type and message.
+ new_exn = RuntimeError(
+ traceback.format_exception_only(type(exn), exn)[-1].strip()
+ + step_annotation)
+ new_exn._tagged_with_step = True
+ raise new_exn, None, original_traceback
class _OutputProcessor(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/baa17835/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index aebd2e1..062e6f9 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -41,9 +41,9 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
return beam.Pipeline(runner=maptask_executor_runner.MapTaskExecutorRunner())
def test_assert_that(self):
- with self.assertRaises(BeamAssertException):
+ with self.assertRaisesRegexp(BeamAssertException, 'bad_assert'):
with self.create_pipeline() as p:
- assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+ assert_that(p | beam.Create(['a', 'b']), equal_to(['a']), 'bad_assert')
def test_create(self):
with self.create_pipeline() as p:
@@ -204,6 +204,21 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
| beam.Map(lambda (k, vs): (k, sorted(vs))))
assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
+ def test_errors(self):
+ with self.assertRaises(BaseException) as e_cm:
+ with self.create_pipeline() as p:
+ def raise_error(x):
+ raise RuntimeError('x')
+ # pylint: disable=expression-not-assigned
+ (p
+ | beam.Create(['a', 'b'])
+ | 'StageA' >> beam.Map(lambda x: x)
+ | 'StageB' >> beam.Map(lambda x: x)
+ | 'StageC' >> beam.Map(raise_error)
+ | 'StageD' >> beam.Map(lambda x: x))
+ self.assertIn('StageC', e_cm.exception.args[0])
+ self.assertNotIn('StageB', e_cm.exception.args[0])
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)