You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:34 UTC
[2/6] beam git commit: Implement wait_until_finish method for
existing runners.
Implement wait_until_finish method for existing runners.
Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74dda50e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74dda50e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74dda50e
Branch: refs/heads/python-sdk
Commit: 74dda50e64a93ab3c147ac24f7436ef04467aa27
Parents: f25c0e4
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jan 9 18:23:20 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 2 +-
.../apache_beam/runners/dataflow_runner.py | 60 +++++++++++++-------
.../apache_beam/runners/direct/direct_runner.py | 15 ++---
sdks/python/apache_beam/runners/runner.py | 35 +++++++++++-
sdks/python/apache_beam/runners/runner_test.py | 1 +
.../apache_beam/runners/template_runner_test.py | 4 +-
.../runners/test/test_dataflow_runner.py | 4 +-
.../apache_beam/utils/pipeline_options.py | 3 +-
sdks/python/run_postcommit.sh | 2 +-
9 files changed, 90 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 6517960..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -167,7 +167,7 @@ class Pipeline(object):
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
- self.run()
+ self.run().wait_until_finish()
def visit(self, visitor):
"""Visits depth-first every node of a pipeline's DAG.
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3505acc..330472b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,7 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
- runner.result = DataflowPipelineResult(response)
+ runner.result = DataflowPipelineResult(response, runner)
runner.last_error_msg = last_error_msg
def run(self, pipeline):
@@ -176,23 +176,11 @@ class DataflowRunner(PipelineRunner):
# Create the job
self.result = DataflowPipelineResult(
- self.dataflow_client.create_job(self.job))
+ self.dataflow_client.create_job(self.job), self)
if self.result.has_job and self.blocking:
- thread = threading.Thread(
- target=DataflowRunner.poll_for_job_completion,
- args=(self, self.result.job_id()))
- # Mark the thread as a daemon thread so a keyboard interrupt on the main
- # thread will terminate everything. This is also the reason we will not
- # use thread.join() to wait for the polling thread.
- thread.daemon = True
- thread.start()
- while thread.isAlive():
- time.sleep(5.0)
- if self.result.current_state() != PipelineState.DONE:
- raise DataflowRuntimeException(
- 'Dataflow pipeline failed:\n%s'
- % getattr(self, 'last_error_msg', None), self.result)
+ self.result.wait_until_finish()
+
return self.result
def _get_typehint_based_encoding(self, typehint, window_coder):
@@ -651,9 +639,10 @@ class DataflowRunner(PipelineRunner):
class DataflowPipelineResult(PipelineResult):
"""Represents the state of a pipeline run on the Dataflow service."""
- def __init__(self, job):
+ def __init__(self, job, runner):
"""Job is a Job message from the Dataflow API."""
self._job = job
+ self._runner = runner
def job_id(self):
return self._job.id
@@ -662,12 +651,16 @@ class DataflowPipelineResult(PipelineResult):
def has_job(self):
return self._job is not None
- def current_state(self):
+ @property
+ def state(self):
"""Return the current state of the remote job.
Returns:
A PipelineState object.
"""
+ if not self.has_job:
+ return PipelineState.UNKNOWN
+
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
api_jobstate_map = {
values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
@@ -684,11 +677,40 @@ class DataflowPipelineResult(PipelineResult):
return (api_jobstate_map[self._job.currentState] if self._job.currentState
else PipelineState.UNKNOWN)
+ def _is_in_terminal_state(self):
+ if not self.has_job:
+ return True
+
+ return self.state in [
+ PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
+ PipelineState.CANCELLED, PipelineState.DRAINED]
+
+ def wait_until_finish(self, duration=None):
+ if not self._is_in_terminal_state():
+ if not self.has_job:
+ raise IOError('Failed to get the Dataflow job id.')
+ if duration:
+ raise NotImplementedError(
+ 'DataflowRunner does not support duration argument.')
+
+ thread = threading.Thread(
+ target=DataflowRunner.poll_for_job_completion,
+ args=(self._runner, self.job_id()))
+
+ # Mark the thread as a daemon thread so a keyboard interrupt on the main
+ # thread will terminate everything. This is also the reason we will not
+ # use thread.join() to wait for the polling thread.
+ thread.daemon = True
+ thread.start()
+ while thread.isAlive():
+ time.sleep(5.0)
+ return self.state
+
def __str__(self):
return '<%s %s %s>' % (
self.__class__.__name__,
self.job_id(),
- self.current_state())
+ self.state)
def __repr__(self):
return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index a5c616b..dc2668d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -43,7 +43,7 @@ class DirectRunner(PipelineRunner):
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
- # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems
+ # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \
@@ -76,12 +76,10 @@ class DirectRunner(PipelineRunner):
executor.start(self.visitor.root_transforms)
result = DirectPipelineResult(executor, evaluation_context)
- # TODO(altay): If blocking:
- # Block until the pipeline completes. This call will return after the
- # pipeline was fully terminated (successfully or with a failure).
- result.await_completion()
-
if self._cache:
+ # We are running in eager mode, block until the pipeline execution
+ # completes in order to have full results in the cache.
+ result.wait_until_finish()
self._cache.finalize()
return result
@@ -141,8 +139,11 @@ class DirectPipelineResult(PipelineResult):
def _is_in_terminal_state(self):
return self._state is not PipelineState.RUNNING
- def await_completion(self):
+ def wait_until_finish(self, duration=None):
if not self._is_in_terminal_state():
+ if duration:
+ raise NotImplementedError(
+ 'DirectRunner does not support duration argument.')
try:
self._executor.await_completion()
self._state = PipelineState.DONE
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 3dc4d28..1a50df4 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -287,7 +287,7 @@ class PValueCache(object):
class PipelineState(object):
- """State of the Pipeline, as returned by PipelineResult.current_state().
+ """State of the Pipeline, as returned by PipelineResult.state.
This is meant to be the union of all the states any runner can put a
pipeline in. Currently, it represents the values of the dataflow
@@ -310,10 +310,39 @@ class PipelineResult(object):
def __init__(self, state):
self._state = state
- def current_state(self):
- """Return the current state of running the pipeline."""
+ @property
+ def state(self):
+ """Return the current state of the pipeline execution."""
return self._state
+ def wait_until_finish(self, duration=None):
+ """Waits until the pipeline finishes and returns the final status.
+
+ Args:
+ duration: The time to wait (in milliseconds) for job to finish. If it is
+ set to None, it will wait indefinitely until the job is finished.
+
+ Raises:
+ IOError: If there is a persistent problem getting job information.
+ NotImplementedError: If the runner does not support this operation.
+
+ Returns:
+ The final state of the pipeline, or None on timeout.
+ """
+ raise NotImplementedError
+
+ def cancel(self):
+ """Cancels the pipeline execution.
+
+ Raises:
+ IOError: If there is a persistent problem getting job information.
+ NotImplementedError: If the runner does not support this operation.
+
+ Returns:
+ The final state of the pipeline.
+ """
+ raise NotImplementedError
+
# pylint: disable=unused-argument
def aggregated_values(self, aggregator_or_name):
"""Return a dict of step names to values of the Aggregator."""
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index ea86061..2b6c316 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -161,6 +161,7 @@ class RunnerTest(unittest.TestCase):
(p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
| 'do' >> beam.ParDo(MyDoFn()))
result = p.run()
+ result.wait_until_finish()
metrics = result.metrics().query()
namespace = '{}.{}'.format(MyDoFn.__module__,
MyDoFn.__name__)
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 457022d..af7f2c6 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -55,7 +55,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
'--no_auth=True']))
pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
- pipeline.run()
+ pipeline.run().wait_until_finish()
with open(dummy_file_name) as template_file:
saved_job_dict = json.load(template_file)
self.assertEqual(
@@ -81,7 +81,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
remote_runner.job = apiclient.Job(pipeline.options)
with self.assertRaises(IOError):
- pipeline.run()
+ pipeline.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index 77655bd..823e534 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -25,14 +25,16 @@ from apache_beam.utils.pipeline_options import TestOptions
class TestDataflowRunner(DataflowRunner):
def __init__(self):
- super(TestDataflowRunner, self).__init__(blocking=True)
+ super(TestDataflowRunner, self).__init__()
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
self.result = super(TestDataflowRunner, self).run(pipeline)
+ self.result.wait_until_finish()
options = pipeline.options.view_as(TestOptions)
if options.on_success_matcher:
from hamcrest import assert_that as hc_assert_that
hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+
return self.result
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index 9f57ee7..16b1640 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -182,8 +182,7 @@ class StandardOptions(PipelineOptions):
parser.add_argument(
'--runner',
help=('Pipeline runner used to execute the workflow. Valid values are '
- 'DirectRunner, DataflowRunner, '
- 'and BlockingDataflowRunner.'))
+ 'DirectRunner, DataflowRunner.'))
# Whether to enable streaming mode.
parser.add_argument('--streaming',
default=False,
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 67a257e..2e419a5 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -74,7 +74,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS"
python setup.py nosetests \
-a ValidatesRunner --test-pipeline-options=" \
- --runner=BlockingDataflowRunner \
+ --runner=TestDataflowRunner \
--project=$PROJECT \
--staging_location=$GCS_LOCATION/staging-validatesrunner-test \
--temp_location=$GCS_LOCATION/temp-validatesrunner-test \