You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/27 03:27:45 UTC
[1/2] beam git commit: Implement cancel and
wait_until_finish(duration) for DataflowRunner
Repository: beam
Updated Branches:
refs/heads/master 456dcc117 -> d50c9644e
Implement cancel and wait_until_finish(duration) for DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f71dc41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f71dc41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f71dc41
Branch: refs/heads/master
Commit: 8f71dc41b30a978095ca0e0699009e4f4445a618
Parents: 456dcc1
Author: Ahmet Altay <al...@google.com>
Authored: Wed Aug 23 18:33:03 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 26 20:26:55 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 96 +++++++++++++++++---
.../runners/dataflow/dataflow_runner_test.py | 80 +++++++++++++++-
sdks/python/apache_beam/runners/runner.py | 3 +
3 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 880901e..813759e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -87,8 +87,18 @@ class DataflowRunner(PipelineRunner):
return 's%s' % self._unique_step_id
@staticmethod
- def poll_for_job_completion(runner, result):
- """Polls for the specified job to finish running (successfully or not)."""
+ def poll_for_job_completion(runner, result, duration):
+ """Polls for the specified job to finish running (successfully or not).
+
+ Updates the result with the new job information before returning.
+
+ Args:
+ runner: DataflowRunner instance to use for polling job state.
+ result: DataflowPipelineResult instance used for job information.
+ duration (int): The time to wait (in milliseconds) for job to finish.
+ If it is set to :data:`None`, it will wait indefinitely until the job
+ is finished.
+ """
last_message_time = None
last_message_hash = None
@@ -109,6 +119,10 @@ class DataflowRunner(PipelineRunner):
return 1
return 0
+ if duration:
+ start_secs = time.time()
+ duration_secs = duration / 1000
+
job_id = result.job_id()
while True:
response = runner.dataflow_client.get_job(job_id)
@@ -161,6 +175,13 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
+ if duration:
+ passed_secs = time.time() - start_secs
+ if duration_secs > passed_secs:
+ logging.warning('Timing out on waiting for job %s after %d seconds',
+ job_id, passed_secs)
+ break
+
result._job = response
runner.last_error_msg = last_error_msg
@@ -280,7 +301,10 @@ class DataflowRunner(PipelineRunner):
self.dataflow_client = apiclient.DataflowApplicationClient(
pipeline._options)
- # Create the job
+ # Create the job description and send a request to the service. The result
+ # can be None if there is no need to send a request to the service (e.g.
+ # template creation). If a request was sent and failed then the call will
+ # raise an exception.
result = DataflowPipelineResult(
self.dataflow_client.create_job(self.job), self)
@@ -838,11 +862,23 @@ class DataflowPipelineResult(PipelineResult):
"""Represents the state of a pipeline run on the Dataflow service."""
def __init__(self, job, runner):
- """Job is a Job message from the Dataflow API."""
+ """Initialize a new DataflowPipelineResult instance.
+
+ Args:
+ job: Job message from the Dataflow API. Could be :data:`None` if a job
+ request was not sent to Dataflow service (e.g. template jobs).
+ runner: DataflowRunner instance.
+ """
self._job = job
self._runner = runner
self.metric_results = None
+ def _update_job(self):
+ # We need the job id to be able to update job information. There is no need
+ # to update the job if we are in a known terminal state.
+ if self.has_job and not self._is_in_terminal_state():
+ self._job = self._runner.dataflow_client.get_job(self.job_id())
+
def job_id(self):
return self._job.id
@@ -863,7 +899,12 @@ class DataflowPipelineResult(PipelineResult):
if not self.has_job:
return PipelineState.UNKNOWN
+ self._update_job()
+
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+ # TODO: Move this table to a another location.
+ # Ordered by the enum values.
api_jobstate_map = {
values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
values_enum.JOB_STATE_STOPPED: PipelineState.STOPPED,
@@ -874,6 +915,8 @@ class DataflowPipelineResult(PipelineResult):
values_enum.JOB_STATE_UPDATED: PipelineState.UPDATED,
values_enum.JOB_STATE_DRAINING: PipelineState.DRAINING,
values_enum.JOB_STATE_DRAINED: PipelineState.DRAINED,
+ values_enum.JOB_STATE_PENDING: PipelineState.PENDING,
+ values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
}
return (api_jobstate_map[self._job.currentState] if self._job.currentState
@@ -883,21 +926,20 @@ class DataflowPipelineResult(PipelineResult):
if not self.has_job:
return True
- return self.state in [
- PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
- PipelineState.CANCELLED, PipelineState.DRAINED]
+ values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+ return self._job.currentState in [
+ values_enum.JOB_STATE_STOPPED, values_enum.JOB_STATE_DONE,
+ values_enum.JOB_STATE_FAILED, values_enum.JOB_STATE_CANCELLED,
+ values_enum.JOB_STATE_DRAINED]
def wait_until_finish(self, duration=None):
if not self._is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
- if duration:
- raise NotImplementedError(
- 'DataflowRunner does not support duration argument.')
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
- args=(self._runner, self))
+ args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
@@ -906,14 +948,40 @@ class DataflowPipelineResult(PipelineResult):
thread.start()
while thread.isAlive():
time.sleep(5.0)
- if self.state != PipelineState.DONE:
- # TODO(BEAM-1290): Consider converting this to an error log based on the
- # resolution of the issue.
+
+ terminated = self._is_in_terminal_state()
+ assert duration or terminated, (
+ 'Job did not reach to a terminal state after waiting indefinitely.')
+
+ if terminated and self.state != PipelineState.DONE:
+ # TODO(BEAM-1290): Consider converting this to an error log based on
+ # theresolution of the issue.
raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
return self.state
+ def cancel(self):
+ if not self.has_job:
+ raise IOError('Failed to get the Dataflow job id.')
+
+ self._update_job()
+
+ if self._is_in_terminal_state():
+ logging.warning(
+ 'Cancel failed because job %s is already terminated in state %s.',
+ self.job_id(), self.state)
+ else:
+ if not self._runner.dataflow_client.modify_job_state(
+ self.job_id(), 'JOB_STATE_CANCELLED'):
+ cancel_failed_message = (
+ 'Failed to cancel job %s, please go to the Developers Console to '
+ 'cancel it manually.') % self.job_id()
+ logging.error(cancel_failed_message)
+ raise DataflowRuntimeException(cancel_failed_message, self)
+
+ return self.state
+
def __str__(self):
return '<%s %s %s>' % (
self.__class__.__name__,
http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 80414d6..8e708e6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -35,6 +35,7 @@ from apache_beam.runners import TestDataflowRunner
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.runner import PipelineState
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.core import _GroupByKeyOnly
@@ -68,13 +69,17 @@ class DataflowRunnerTest(unittest.TestCase):
class MockDataflowRunner(object):
- def __init__(self, final_state):
+ def __init__(self, states):
self.dataflow_client = mock.MagicMock()
self.job = mock.MagicMock()
self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+ self._states = states
+ self._next_state_index = 0
def get_job_side_effect(*args, **kwargs):
- self.job.currentState = final_state
+ self.job.currentState = self._states[self._next_state_index]
+ if self._next_state_index < (len(self._states) - 1):
+ self._next_state_index += 1
return mock.DEFAULT
self.dataflow_client.get_job = mock.MagicMock(
@@ -84,14 +89,79 @@ class DataflowRunnerTest(unittest.TestCase):
with self.assertRaisesRegexp(
DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
- failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+ failed_runner = MockDataflowRunner([values_enum.JOB_STATE_FAILED])
failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
failed_result.wait_until_finish()
- succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+ succeeded_runner = MockDataflowRunner([values_enum.JOB_STATE_DONE])
succeeded_result = DataflowPipelineResult(
succeeded_runner.job, succeeded_runner)
- succeeded_result.wait_until_finish()
+ result = succeeded_result.wait_until_finish()
+ self.assertEqual(result, PipelineState.DONE)
+
+ @mock.patch('time.time', mock.MagicMock(side_effect=[1, 2, 3]))
+ def _duration_succeeded():
+ duration_succeeded_runner = MockDataflowRunner(
+ [values_enum.JOB_STATE_RUNNING, values_enum.JOB_STATE_DONE])
+ duration_succeeded_result = DataflowPipelineResult(
+ duration_succeeded_runner.job, duration_succeeded_runner)
+ result = duration_succeeded_result.wait_until_finish(5)
+ self.assertEqual(result, PipelineState.DONE)
+ _duration_succeeded()
+
+ @mock.patch('time.time', mock.MagicMock(side_effect=[1, 10, 20]))
+ def _duration_timedout():
+ duration_timedout_runner = MockDataflowRunner(
+ [values_enum.JOB_STATE_RUNNING])
+ duration_timedout_result = DataflowPipelineResult(
+ duration_timedout_runner.job, duration_timedout_runner)
+ result = duration_timedout_result.wait_until_finish(5)
+ self.assertEqual(result, PipelineState.RUNNING)
+ _duration_timedout()
+
+ @mock.patch('time.time', mock.MagicMock(side_effect=[1, 2, 3]))
+ def _duration_failed():
+ with self.assertRaisesRegexp(
+ DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
+ duration_failed_runner = MockDataflowRunner(
+ [values_enum.JOB_STATE_FAILED])
+ duration_failed_result = DataflowPipelineResult(
+ duration_failed_runner.job, duration_failed_runner)
+ duration_failed_result.wait_until_finish(5)
+ _duration_failed()
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_cancel(self, patched_time_sleep):
+ values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+ class MockDataflowRunner(object):
+
+ def __init__(self, state, cancel_result):
+ self.dataflow_client = mock.MagicMock()
+ self.job = mock.MagicMock()
+ self.job.currentState = state
+
+ self.dataflow_client.get_job = mock.MagicMock(return_value=self.job)
+ self.dataflow_client.modify_job_state = mock.MagicMock(
+ return_value=cancel_result)
+ self.dataflow_client.list_messages = mock.MagicMock(
+ return_value=([], None))
+
+ with self.assertRaisesRegexp(
+ DataflowRuntimeException, 'Failed to cancel job'):
+ failed_runner = MockDataflowRunner(values_enum.JOB_STATE_RUNNING, False)
+ failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+ failed_result.cancel()
+
+ succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_RUNNING, True)
+ succeeded_result = DataflowPipelineResult(
+ succeeded_runner.job, succeeded_runner)
+ succeeded_result.cancel()
+
+ terminal_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE, False)
+ terminal_result = DataflowPipelineResult(
+ terminal_runner.job, terminal_runner)
+ terminal_result.cancel()
def test_create_runner(self):
self.assertTrue(
http://git-wip-us.apache.org/repos/asf/beam/blob/8f71dc41/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index a3c6b34..43ee27b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -298,6 +298,9 @@ class PipelineState(object):
UPDATED = 'UPDATED' # replaced by another job (terminal state)
DRAINING = 'DRAINING' # still processing, no longer reading data
DRAINED = 'DRAINED' # draining completed (terminal state)
+ PENDING = 'PENDING' # the job has been created but is not yet running.
+ CANCELLING = 'CANCELLING' # job has been explicitly cancelled and is
+ # in the process of stopping
class PipelineResult(object):
[2/2] beam git commit: This closes #3757
Posted by al...@apache.org.
This closes #3757
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d50c9644
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d50c9644
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d50c9644
Branch: refs/heads/master
Commit: d50c9644e95fdd99fcf12c1afdf2822fe4b2b3e2
Parents: 456dcc1 8f71dc4
Author: Ahmet Altay <al...@google.com>
Authored: Sat Aug 26 20:27:28 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 26 20:27:28 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 96 +++++++++++++++++---
.../runners/dataflow/dataflow_runner_test.py | 80 +++++++++++++++-
sdks/python/apache_beam/runners/runner.py | 3 +
3 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------