You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/21 08:31:02 UTC

[1/2] beam git commit: Update DataflowPipelineResult.state at the end of poll_for_job_completion.

Repository: beam
Updated Branches:
  refs/heads/python-sdk c03e6f375 -> 946135f6a


Update DataflowPipelineResult.state at the end of poll_for_job_completion.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56512ab4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56512ab4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56512ab4

Branch: refs/heads/python-sdk
Commit: 56512ab442c599c64bfdb9fc6cabce95d76ee4dc
Parents: c03e6f3
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 23:43:42 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jan 20 23:43:42 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py      | 6 ++++--
 sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index bd25dbf..31d3386 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -78,7 +78,7 @@ class DataflowRunner(PipelineRunner):
     return 's%s' % self._unique_step_id
 
   @staticmethod
-  def poll_for_job_completion(runner, job_id):
+  def poll_for_job_completion(runner, result):
     """Polls for the specified job to finish running (successfully or not)."""
     last_message_time = None
     last_message_id = None
@@ -101,6 +101,7 @@ class DataflowRunner(PipelineRunner):
       else:
         return 0
 
+    job_id = result.job_id()
     while True:
       response = runner.dataflow_client.get_job(job_id)
       # If get() is called very soon after Create() the response may not contain
@@ -151,6 +152,7 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
+    result._job = response
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -694,7 +696,7 @@ class DataflowPipelineResult(PipelineResult):
 
       thread = threading.Thread(
           target=DataflowRunner.poll_for_job_completion,
-          args=(self._runner, self.job_id()))
+          args=(self._runner, self))
 
       # Mark the thread as a daemon thread so a keyboard interrupt on the main
       # thread will terminate everything. This is also the reason we will not

http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
index a935c98..4983899 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -47,12 +47,11 @@ class DataflowRunnerTest(unittest.TestCase):
         self.dataflow_client.list_messages = mock.MagicMock(
             return_value=([], None))
 
-    with self.assertRaises(DataflowRuntimeException) as e:
+    with self.assertRaisesRegexp(
+        DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
       failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
       failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
       failed_result.wait_until_finish()
-    self.assertTrue(
-        'Dataflow pipeline failed. State: FAILED' in e.exception.message)
 
     succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
     succeeded_result = DataflowPipelineResult(


[2/2] beam git commit: Closes #1809

Posted by ro...@apache.org.
Closes #1809


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/946135f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/946135f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/946135f6

Branch: refs/heads/python-sdk
Commit: 946135f6a955d9e27e7553c4cefef354ecd2535d
Parents: c03e6f3 56512ab
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 00:30:35 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jan 21 00:30:35 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py      | 6 ++++--
 sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------