You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:34 UTC

[2/6] beam git commit: Implement wait_until_finish method for existing runners.

Implement wait_until_finish method for existing runners.

Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74dda50e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74dda50e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74dda50e

Branch: refs/heads/python-sdk
Commit: 74dda50e64a93ab3c147ac24f7436ef04467aa27
Parents: f25c0e4
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jan 9 18:23:20 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  2 +-
 .../apache_beam/runners/dataflow_runner.py      | 60 +++++++++++++-------
 .../apache_beam/runners/direct/direct_runner.py | 15 ++---
 sdks/python/apache_beam/runners/runner.py       | 35 +++++++++++-
 sdks/python/apache_beam/runners/runner_test.py  |  1 +
 .../apache_beam/runners/template_runner_test.py |  4 +-
 .../runners/test/test_dataflow_runner.py        |  4 +-
 .../apache_beam/utils/pipeline_options.py       |  3 +-
 sdks/python/run_postcommit.sh                   |  2 +-
 9 files changed, 90 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 6517960..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -167,7 +167,7 @@ class Pipeline(object):
 
   def __exit__(self, exc_type, exc_val, exc_tb):
     if not exc_type:
-      self.run()
+      self.run().wait_until_finish()
 
   def visit(self, visitor):
     """Visits depth-first every node of a pipeline's DAG.

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3505acc..330472b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,7 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
-    runner.result = DataflowPipelineResult(response)
+    runner.result = DataflowPipelineResult(response, runner)
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -176,23 +176,11 @@ class DataflowRunner(PipelineRunner):
 
     # Create the job
     self.result = DataflowPipelineResult(
-        self.dataflow_client.create_job(self.job))
+        self.dataflow_client.create_job(self.job), self)
 
     if self.result.has_job and self.blocking:
-      thread = threading.Thread(
-          target=DataflowRunner.poll_for_job_completion,
-          args=(self, self.result.job_id()))
-      # Mark the thread as a daemon thread so a keyboard interrupt on the main
-      # thread will terminate everything. This is also the reason we will not
-      # use thread.join() to wait for the polling thread.
-      thread.daemon = True
-      thread.start()
-      while thread.isAlive():
-        time.sleep(5.0)
-      if self.result.current_state() != PipelineState.DONE:
-        raise DataflowRuntimeException(
-            'Dataflow pipeline failed:\n%s'
-            % getattr(self, 'last_error_msg', None), self.result)
+      self.result.wait_until_finish()
+
     return self.result
 
   def _get_typehint_based_encoding(self, typehint, window_coder):
@@ -651,9 +639,10 @@ class DataflowRunner(PipelineRunner):
 class DataflowPipelineResult(PipelineResult):
   """Represents the state of a pipeline run on the Dataflow service."""
 
-  def __init__(self, job):
+  def __init__(self, job, runner):
     """Job is a Job message from the Dataflow API."""
     self._job = job
+    self._runner = runner
 
   def job_id(self):
     return self._job.id
@@ -662,12 +651,16 @@ class DataflowPipelineResult(PipelineResult):
   def has_job(self):
     return self._job is not None
 
-  def current_state(self):
+  @property
+  def state(self):
     """Return the current state of the remote job.
 
     Returns:
       A PipelineState object.
     """
+    if not self.has_job:
+      return PipelineState.UNKNOWN
+
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
     api_jobstate_map = {
         values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
@@ -684,11 +677,40 @@ class DataflowPipelineResult(PipelineResult):
     return (api_jobstate_map[self._job.currentState] if self._job.currentState
             else PipelineState.UNKNOWN)
 
+  def _is_in_terminal_state(self):
+    if not self.has_job:
+      return True
+
+    return self.state in [
+        PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
+        PipelineState.CANCELLED, PipelineState.DRAINED]
+
+  def wait_until_finish(self, duration=None):
+    if not self._is_in_terminal_state():
+      if not self.has_job:
+        raise IOError('Failed to get the Dataflow job id.')
+      if duration:
+        raise NotImplementedError(
+            'DataflowRunner does not support duration argument.')
+
+      thread = threading.Thread(
+          target=DataflowRunner.poll_for_job_completion,
+          args=(self._runner, self.job_id()))
+
+      # Mark the thread as a daemon thread so a keyboard interrupt on the main
+      # thread will terminate everything. This is also the reason we will not
+      # use thread.join() to wait for the polling thread.
+      thread.daemon = True
+      thread.start()
+      while thread.isAlive():
+        time.sleep(5.0)
+    return self.state
+
   def __str__(self):
     return '<%s %s %s>' % (
         self.__class__.__name__,
         self.job_id(),
-        self.current_state())
+        self.state)
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index a5c616b..dc2668d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -43,7 +43,7 @@ class DirectRunner(PipelineRunner):
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
-    # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems
+    # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
     # with resolving imports when they are at top.
     # pylint: disable=wrong-import-position
     from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \
@@ -76,12 +76,10 @@ class DirectRunner(PipelineRunner):
     executor.start(self.visitor.root_transforms)
     result = DirectPipelineResult(executor, evaluation_context)
 
-    # TODO(altay): If blocking:
-    # Block until the pipeline completes. This call will return after the
-    # pipeline was fully terminated (successfully or with a failure).
-    result.await_completion()
-
     if self._cache:
+      # We are running in eager mode, block until the pipeline execution
+      # completes in order to have full results in the cache.
+      result.wait_until_finish()
       self._cache.finalize()
 
     return result
@@ -141,8 +139,11 @@ class DirectPipelineResult(PipelineResult):
   def _is_in_terminal_state(self):
     return self._state is not PipelineState.RUNNING
 
-  def await_completion(self):
+  def wait_until_finish(self, duration=None):
     if not self._is_in_terminal_state():
+      if duration:
+        raise NotImplementedError(
+            'DirectRunner does not support duration argument.')
       try:
         self._executor.await_completion()
         self._state = PipelineState.DONE

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 3dc4d28..1a50df4 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -287,7 +287,7 @@ class PValueCache(object):
 
 
 class PipelineState(object):
-  """State of the Pipeline, as returned by PipelineResult.current_state().
+  """State of the Pipeline, as returned by PipelineResult.state.
 
   This is meant to be the union of all the states any runner can put a
   pipeline in.  Currently, it represents the values of the dataflow
@@ -310,10 +310,39 @@ class PipelineResult(object):
   def __init__(self, state):
     self._state = state
 
-  def current_state(self):
-    """Return the current state of running the pipeline."""
+  @property
+  def state(self):
+    """Return the current state of the pipeline execution."""
     return self._state
 
+  def wait_until_finish(self, duration=None):
+    """Waits until the pipeline finishes and returns the final status.
+
+    Args:
+      duration: The time to wait (in milliseconds) for job to finish. If it is
+        set to None, it will wait indefinitely until the job is finished.
+
+    Raises:
+      IOError: If there is a persistent problem getting job information.
+      NotImplementedError: If the runner does not support this operation.
+
+    Returns:
+      The final state of the pipeline, or None on timeout.
+    """
+    raise NotImplementedError
+
+  def cancel(self):
+    """Cancels the pipeline execution.
+
+    Raises:
+      IOError: If there is a persistent problem getting job information.
+      NotImplementedError: If the runner does not support this operation.
+
+    Returns:
+      The final state of the pipeline.
+    """
+    raise NotImplementedError
+
   # pylint: disable=unused-argument
   def aggregated_values(self, aggregator_or_name):
     """Return a dict of step names to values of the Aggregator."""

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index ea86061..2b6c316 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -161,6 +161,7 @@ class RunnerTest(unittest.TestCase):
     (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
      | 'do' >> beam.ParDo(MyDoFn()))
     result = p.run()
+    result.wait_until_finish()
     metrics = result.metrics().query()
     namespace = '{}.{}'.format(MyDoFn.__module__,
                                MyDoFn.__name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 457022d..af7f2c6 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -55,7 +55,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
                             '--no_auth=True']))
 
     pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
-    pipeline.run()
+    pipeline.run().wait_until_finish()
     with open(dummy_file_name) as template_file:
       saved_job_dict = json.load(template_file)
       self.assertEqual(
@@ -81,7 +81,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(pipeline.options)
 
     with self.assertRaises(IOError):
-      pipeline.run()
+      pipeline.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index 77655bd..823e534 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -25,14 +25,16 @@ from apache_beam.utils.pipeline_options import TestOptions
 class TestDataflowRunner(DataflowRunner):
 
   def __init__(self):
-    super(TestDataflowRunner, self).__init__(blocking=True)
+    super(TestDataflowRunner, self).__init__()
 
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     self.result = super(TestDataflowRunner, self).run(pipeline)
+    self.result.wait_until_finish()
 
     options = pipeline.options.view_as(TestOptions)
     if options.on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+
     return self.result

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index 9f57ee7..16b1640 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -182,8 +182,7 @@ class StandardOptions(PipelineOptions):
     parser.add_argument(
         '--runner',
         help=('Pipeline runner used to execute the workflow. Valid values are '
-              'DirectRunner, DataflowRunner, '
-              'and BlockingDataflowRunner.'))
+              'DirectRunner, DataflowRunner.'))
     # Whether to enable streaming mode.
     parser.add_argument('--streaming',
                         default=False,

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 67a257e..2e419a5 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -74,7 +74,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
 echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS"
 python setup.py nosetests \
   -a ValidatesRunner --test-pipeline-options=" \
-    --runner=BlockingDataflowRunner \
+    --runner=TestDataflowRunner \
     --project=$PROJECT \
     --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
     --temp_location=$GCS_LOCATION/temp-validatesrunner-test \