You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/27 03:27:45 UTC

[1/2] beam git commit: Implement cancel and wait_until_finish(duration) for DataflowRunner

Repository: beam
Updated Branches:
  refs/heads/master 456dcc117 -> d50c9644e


Implement cancel and wait_until_finish(duration) for DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f71dc41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f71dc41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f71dc41

Branch: refs/heads/master
Commit: 8f71dc41b30a978095ca0e0699009e4f4445a618
Parents: 456dcc1
Author: Ahmet Altay <al...@google.com>
Authored: Wed Aug 23 18:33:03 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 26 20:26:55 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 96 +++++++++++++++++---
 .../runners/dataflow/dataflow_runner_test.py    | 80 +++++++++++++++-
 sdks/python/apache_beam/runners/runner.py       |  3 +
 3 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 880901e..813759e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -87,8 +87,18 @@ class DataflowRunner(PipelineRunner):
     return 's%s' % self._unique_step_id
 
   @staticmethod
-  def poll_for_job_completion(runner, result):
-    """Polls for the specified job to finish running (successfully or not)."""
+  def poll_for_job_completion(runner, result, duration):
+    """Polls for the specified job to finish running (successfully or not).
+
+    Updates the result with the new job information before returning.
+
+    Args:
+      runner: DataflowRunner instance to use for polling job state.
+      result: DataflowPipelineResult instance used for job information.
+      duration (int): The time to wait (in milliseconds) for job to finish.
+        If it is set to :data:`None`, it will wait indefinitely until the job
+        is finished.
+    """
     last_message_time = None
     last_message_hash = None
 
@@ -109,6 +119,10 @@ class DataflowRunner(PipelineRunner):
         return 1
       return 0
 
+    if duration:
+      start_secs = time.time()
+      duration_secs = duration / 1000
+
     job_id = result.job_id()
     while True:
       response = runner.dataflow_client.get_job(job_id)
@@ -161,6 +175,13 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
+      if duration:
+        passed_secs = time.time() - start_secs
+        if duration_secs > passed_secs:
+          logging.warning('Timing out on waiting for job %s after %d seconds',
+                          job_id, passed_secs)
+          break
+
     result._job = response
     runner.last_error_msg = last_error_msg
 
@@ -280,7 +301,10 @@ class DataflowRunner(PipelineRunner):
     self.dataflow_client = apiclient.DataflowApplicationClient(
         pipeline._options)
 
-    # Create the job
+    # Create the job description and send a request to the service. The result
+    # can be None if there is no need to send a request to the service (e.g.
+    # template creation). If a request was sent and failed then the call will
+    # raise an exception.
     result = DataflowPipelineResult(
         self.dataflow_client.create_job(self.job), self)
 
@@ -838,11 +862,23 @@ class DataflowPipelineResult(PipelineResult):
   """Represents the state of a pipeline run on the Dataflow service."""
 
   def __init__(self, job, runner):
-    """Job is a Job message from the Dataflow API."""
+    """Initialize a new DataflowPipelineResult instance.
+
+    Args:
+      job: Job message from the Dataflow API. Could be :data:`None` if a job
+        request was not sent to Dataflow service (e.g. template jobs).
+      runner: DataflowRunner instance.
+    """
     self._job = job
     self._runner = runner
     self.metric_results = None
 
+  def _update_job(self):
+    # We need the job id to be able to update job information. There is no need
+    # to update the job if we are in a known terminal state.
+    if self.has_job and not self._is_in_terminal_state():
+      self._job = self._runner.dataflow_client.get_job(self.job_id())
+
   def job_id(self):
     return self._job.id
 
@@ -863,7 +899,12 @@ class DataflowPipelineResult(PipelineResult):
     if not self.has_job:
       return PipelineState.UNKNOWN
 
+    self._update_job()
+
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+    # TODO: Move this table to a another location.
+    # Ordered by the enum values.
     api_jobstate_map = {
         values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
         values_enum.JOB_STATE_STOPPED: PipelineState.STOPPED,
@@ -874,6 +915,8 @@ class DataflowPipelineResult(PipelineResult):
         values_enum.JOB_STATE_UPDATED: PipelineState.UPDATED,
         values_enum.JOB_STATE_DRAINING: PipelineState.DRAINING,
         values_enum.JOB_STATE_DRAINED: PipelineState.DRAINED,
+        values_enum.JOB_STATE_PENDING: PipelineState.PENDING,
+        values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
     }
 
     return (api_jobstate_map[self._job.currentState] if self._job.currentState
@@ -883,21 +926,20 @@ class DataflowPipelineResult(PipelineResult):
     if not self.has_job:
       return True
 
-    return self.state in [
-        PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
-        PipelineState.CANCELLED, PipelineState.DRAINED]
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+    return self._job.currentState in [
+        values_enum.JOB_STATE_STOPPED, values_enum.JOB_STATE_DONE,
+        values_enum.JOB_STATE_FAILED, values_enum.JOB_STATE_CANCELLED,
+        values_enum.JOB_STATE_DRAINED]
 
   def wait_until_finish(self, duration=None):
     if not self._is_in_terminal_state():
       if not self.has_job:
         raise IOError('Failed to get the Dataflow job id.')
-      if duration:
-        raise NotImplementedError(
-            'DataflowRunner does not support duration argument.')
 
       thread = threading.Thread(
           target=DataflowRunner.poll_for_job_completion,
-          args=(self._runner, self))
+          args=(self._runner, self, duration))
 
       # Mark the thread as a daemon thread so a keyboard interrupt on the main
       # thread will terminate everything. This is also the reason we will not
@@ -906,14 +948,40 @@ class DataflowPipelineResult(PipelineResult):
       thread.start()
       while thread.isAlive():
         time.sleep(5.0)
-      if self.state != PipelineState.DONE:
-        # TODO(BEAM-1290): Consider converting this to an error log based on the
-        # resolution of the issue.
+
+      terminated = self._is_in_terminal_state()
+      assert duration or terminated, (
+          'Job did not reach to a terminal state after waiting indefinitely.')
+
+      if terminated and self.state != PipelineState.DONE:
+        # TODO(BEAM-1290): Consider converting this to an error log based on
+        # theresolution of the issue.
         raise DataflowRuntimeException(
             'Dataflow pipeline failed. State: %s, Error:\n%s' %
             (self.state, getattr(self._runner, 'last_error_msg', None)), self)
     return self.state
 
+  def cancel(self):
+    if not self.has_job:
+      raise IOError('Failed to get the Dataflow job id.')
+
+    self._update_job()
+
+    if self._is_in_terminal_state():
+      logging.warning(
+          'Cancel failed because job %s is already terminated in state %s.',
+          self.job_id(), self.state)
+    else:
+      if not self._runner.dataflow_client.modify_job_state(
+          self.job_id(), 'JOB_STATE_CANCELLED'):
+        cancel_failed_message = (
+            'Failed to cancel job %s, please go to the Developers Console to '
+            'cancel it manually.') % self.job_id()
+        logging.error(cancel_failed_message)
+        raise DataflowRuntimeException(cancel_failed_message, self)
+
+    return self.state
+
   def __str__(self):
     return '<%s %s %s>' % (
         self.__class__.__name__,

http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 80414d6..8e708e6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -35,6 +35,7 @@ from apache_beam.runners import TestDataflowRunner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.core import _GroupByKeyOnly
@@ -68,13 +69,17 @@ class DataflowRunnerTest(unittest.TestCase):
 
     class MockDataflowRunner(object):
 
-      def __init__(self, final_state):
+      def __init__(self, states):
         self.dataflow_client = mock.MagicMock()
         self.job = mock.MagicMock()
         self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+        self._states = states
+        self._next_state_index = 0
 
         def get_job_side_effect(*args, **kwargs):
-          self.job.currentState = final_state
+          self.job.currentState = self._states[self._next_state_index]
+          if self._next_state_index < (len(self._states) - 1):
+            self._next_state_index += 1
           return mock.DEFAULT
 
         self.dataflow_client.get_job = mock.MagicMock(
@@ -84,14 +89,79 @@ class DataflowRunnerTest(unittest.TestCase):
 
     with self.assertRaisesRegexp(
         DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
-      failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+      failed_runner = MockDataflowRunner([values_enum.JOB_STATE_FAILED])
       failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
       failed_result.wait_until_finish()
 
-    succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+    succeeded_runner = MockDataflowRunner([values_enum.JOB_STATE_DONE])
     succeeded_result = DataflowPipelineResult(
         succeeded_runner.job, succeeded_runner)
-    succeeded_result.wait_until_finish()
+    result = succeeded_result.wait_until_finish()
+    self.assertEqual(result, PipelineState.DONE)
+
+    @mock.patch('time.time', mock.MagicMock(side_effect=[1, 2, 3]))
+    def _duration_succeeded():
+      duration_succeeded_runner = MockDataflowRunner(
+          [values_enum.JOB_STATE_RUNNING, values_enum.JOB_STATE_DONE])
+      duration_succeeded_result = DataflowPipelineResult(
+          duration_succeeded_runner.job, duration_succeeded_runner)
+      result = duration_succeeded_result.wait_until_finish(5)
+      self.assertEqual(result, PipelineState.DONE)
+    _duration_succeeded()
+
+    @mock.patch('time.time', mock.MagicMock(side_effect=[1, 10, 20]))
+    def _duration_timedout():
+      duration_timedout_runner = MockDataflowRunner(
+          [values_enum.JOB_STATE_RUNNING])
+      duration_timedout_result = DataflowPipelineResult(
+          duration_timedout_runner.job, duration_timedout_runner)
+      result = duration_timedout_result.wait_until_finish(5)
+      self.assertEqual(result, PipelineState.RUNNING)
+    _duration_timedout()
+
+    @mock.patch('time.time', mock.MagicMock(side_effect=[1, 2, 3]))
+    def _duration_failed():
+      with self.assertRaisesRegexp(
+          DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
+        duration_failed_runner = MockDataflowRunner(
+            [values_enum.JOB_STATE_FAILED])
+        duration_failed_result = DataflowPipelineResult(
+            duration_failed_runner.job, duration_failed_runner)
+        duration_failed_result.wait_until_finish(5)
+    _duration_failed()
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_cancel(self, patched_time_sleep):
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+    class MockDataflowRunner(object):
+
+      def __init__(self, state, cancel_result):
+        self.dataflow_client = mock.MagicMock()
+        self.job = mock.MagicMock()
+        self.job.currentState = state
+
+        self.dataflow_client.get_job = mock.MagicMock(return_value=self.job)
+        self.dataflow_client.modify_job_state = mock.MagicMock(
+            return_value=cancel_result)
+        self.dataflow_client.list_messages = mock.MagicMock(
+            return_value=([], None))
+
+    with self.assertRaisesRegexp(
+        DataflowRuntimeException, 'Failed to cancel job'):
+      failed_runner = MockDataflowRunner(values_enum.JOB_STATE_RUNNING, False)
+      failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+      failed_result.cancel()
+
+    succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_RUNNING, True)
+    succeeded_result = DataflowPipelineResult(
+        succeeded_runner.job, succeeded_runner)
+    succeeded_result.cancel()
+
+    terminal_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE, False)
+    terminal_result = DataflowPipelineResult(
+        terminal_runner.job, terminal_runner)
+    terminal_result.cancel()
 
   def test_create_runner(self):
     self.assertTrue(

http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index a3c6b34..43ee27b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -298,6 +298,9 @@ class PipelineState(object):
   UPDATED = 'UPDATED'  # replaced by another job (terminal state)
   DRAINING = 'DRAINING'  # still processing, no longer reading data
   DRAINED = 'DRAINED'  # draining completed (terminal state)
+  PENDING = 'PENDING' # the job has been created but is not yet running.
+  CANCELLING = 'CANCELLING' # job has been explicitly cancelled and is
+                            # in the process of stopping
 
 
 class PipelineResult(object):


[2/2] beam git commit: This closes #3757

Posted by al...@apache.org.
This closes #3757


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d50c9644
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d50c9644
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d50c9644

Branch: refs/heads/master
Commit: d50c9644e95fdd99fcf12c1afdf2822fe4b2b3e2
Parents: 456dcc1 8f71dc4
Author: Ahmet Altay <al...@google.com>
Authored: Sat Aug 26 20:27:28 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 26 20:27:28 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 96 +++++++++++++++++---
 .../runners/dataflow/dataflow_runner_test.py    | 80 +++++++++++++++-
 sdks/python/apache_beam/runners/runner.py       |  3 +
 3 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------