You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/21 08:31:02 UTC
[1/2] beam git commit: Update DataflowPipelineResult.state at the end
of poll_for_job_completion.
Repository: beam
Updated Branches:
refs/heads/python-sdk c03e6f375 -> 946135f6a
Update DataflowPipelineResult.state at the end of poll_for_job_completion.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56512ab4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56512ab4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56512ab4
Branch: refs/heads/python-sdk
Commit: 56512ab442c599c64bfdb9fc6cabce95d76ee4dc
Parents: c03e6f3
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 23:43:42 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jan 20 23:43:42 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/dataflow_runner.py | 6 ++++--
sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index bd25dbf..31d3386 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -78,7 +78,7 @@ class DataflowRunner(PipelineRunner):
return 's%s' % self._unique_step_id
@staticmethod
- def poll_for_job_completion(runner, job_id):
+ def poll_for_job_completion(runner, result):
"""Polls for the specified job to finish running (successfully or not)."""
last_message_time = None
last_message_id = None
@@ -101,6 +101,7 @@ class DataflowRunner(PipelineRunner):
else:
return 0
+ job_id = result.job_id()
while True:
response = runner.dataflow_client.get_job(job_id)
# If get() is called very soon after Create() the response may not contain
@@ -151,6 +152,7 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
+ result._job = response
runner.last_error_msg = last_error_msg
def run(self, pipeline):
@@ -694,7 +696,7 @@ class DataflowPipelineResult(PipelineResult):
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
- args=(self._runner, self.job_id()))
+ args=(self._runner, self))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
index a935c98..4983899 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -47,12 +47,11 @@ class DataflowRunnerTest(unittest.TestCase):
self.dataflow_client.list_messages = mock.MagicMock(
return_value=([], None))
- with self.assertRaises(DataflowRuntimeException) as e:
+ with self.assertRaisesRegexp(
+ DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
failed_result.wait_until_finish()
- self.assertTrue(
- 'Dataflow pipeline failed. State: FAILED' in e.exception.message)
succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
succeeded_result = DataflowPipelineResult(
[2/2] beam git commit: Closes #1809
Posted by ro...@apache.org.
Closes #1809
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/946135f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/946135f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/946135f6
Branch: refs/heads/python-sdk
Commit: 946135f6a955d9e27e7553c4cefef354ecd2535d
Parents: c03e6f3 56512ab
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 00:30:35 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jan 21 00:30:35 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/dataflow_runner.py | 6 ++++--
sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------