You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/11/21 21:41:35 UTC
[1/2] beam git commit: Add logging and test aborting for timeouts.
Repository: beam
Updated Branches:
refs/heads/master fb41b2950 -> e9d746a34
Add logging and test aborting for timeouts.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/768c854f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/768c854f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/768c854f
Branch: refs/heads/master
Commit: 768c854f3723dd747e905aeb0d35024ae44e2cda
Parents: fb41b29
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 12:25:18 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 12:25:18 2017 -0800
----------------------------------------------------------------------
.../portability/universal_local_runner.py | 5 ++-
.../portability/universal_local_runner_test.py | 32 ++++++++++++++++++--
2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 579983c..b951194 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -178,7 +178,9 @@ class PipelineResult(runner.PipelineResult):
for message in self._job_service.GetMessageStream(
beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
self._messages.append(message)
- threading.Thread(target=read_messages).start()
+ t = threading.Thread(target=read_messages, name='wait_until_finish_read')
+ t.daemon = True
+ t.start()
for state_response in self._job_service.GetStateStream(
beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
@@ -244,6 +246,7 @@ class BeamJob(threading.Thread):
logging.exception("Error running pipeline.")
traceback.print_exc()
self.state = beam_job_api_pb2.JobState.FAILED
+ raise
def cancel(self):
if self.state not in TERMINAL_STATES:
http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
index e1104dc..1fc244b 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
@@ -16,6 +16,11 @@
#
import logging
+import platform
+import signal
+import sys
+import threading
+import traceback
import unittest
import apache_beam as beam
@@ -27,9 +32,31 @@ from apache_beam.testing.util import equal_to
class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
+ TIMEOUT_SECS = 30
+
_use_grpc = False
_use_subprocesses = False
+ def setUp(self):
+ if platform.system() != 'Windows':
+ def handler(signum, frame):
+ msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
+ print '=' * 20, msg, '=' * 20
+ traceback.print_stack(frame)
+ threads_by_id = {th.ident: th for th in threading.enumerate()}
+ for thread_id, stack in sys._current_frames().items():
+ th = threads_by_id.get(thread_id)
+ print
+ print '# Thread:', th or thread_id
+ traceback.print_stack(stack)
+ raise BaseException(msg)
+ signal.signal(signal.SIGALRM, handler)
+ signal.alarm(self.TIMEOUT_SECS)
+
+ def tearDown(self):
+ if platform.system() != 'Windows':
+ signal.alarm(0)
+
@classmethod
def get_runner(cls):
# Don't inherit.
@@ -41,7 +68,8 @@ class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
@classmethod
def tearDownClass(cls):
- cls._runner.cleanup()
+ if hasattr(cls, '_runner'):
+ cls._runner.cleanup()
def create_pipeline(self):
return beam.Pipeline(self.get_runner())
@@ -56,7 +84,7 @@ class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
def test_errors(self):
# TODO: figure out a way for runner to parse and raise the
# underlying exception.
- with self.assertRaises(BaseException):
+ with self.assertRaises(Exception):
with self.create_pipeline() as p:
def raise_error(x):
raise RuntimeError('x')
[2/2] beam git commit: Closes #4159
Posted by ro...@apache.org.
Closes #4159
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9d746a3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9d746a3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9d746a3
Branch: refs/heads/master
Commit: e9d746a34a82791571b002a78c62e9f46efd4253
Parents: fb41b29 768c854
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 13:41:13 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 13:41:13 2017 -0800
----------------------------------------------------------------------
.../portability/universal_local_runner.py | 5 ++-
.../portability/universal_local_runner_test.py | 32 ++++++++++++++++++--
2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------