You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/21 00:46:46 UTC

[1/3] beam git commit: Make TestPipeline.run fail when the underlying execution fails.

Repository: beam
Updated Branches:
  refs/heads/python-sdk c57c66ed4 -> 82599a241


Make TestPipeline.run fail when the underlying execution fails.

Also, DataflowRunner will log the last error from its wait_until_finish
method.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa3a2cb3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa3a2cb3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa3a2cb3

Branch: refs/heads/python-sdk
Commit: aa3a2cb326a5f761eba9fe87fe7d57da9ce78555
Parents: c57c66e
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jan 19 16:13:07 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:20 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py    | 12 ++++--------
 sdks/python/apache_beam/test_pipeline.py              |  5 ++++-
 sdks/python/apache_beam/transforms/aggregator_test.py |  2 +-
 3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 330472b..fd22753 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -704,6 +704,10 @@ class DataflowPipelineResult(PipelineResult):
       thread.start()
       while thread.isAlive():
         time.sleep(5.0)
+      if self.state != PipelineState.DONE:
+        logging.error(
+            'Dataflow pipeline failed. State: %s, Error:\n%s',
+            self.state, getattr(self._runner, 'last_error_msg', None))
     return self.state
 
   def __str__(self):
@@ -714,11 +718,3 @@ class DataflowPipelineResult(PipelineResult):
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
-
-
-class DataflowRuntimeException(Exception):
-  """Indicates an error has occurred in running this pipeline."""
-
-  def __init__(self, msg, result):
-    super(DataflowRuntimeException, self).__init__(msg)
-    self.result = result

http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index c29a879..7d85af9 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -22,6 +22,7 @@ import shlex
 
 from apache_beam.internal import pickler
 from apache_beam.pipeline import Pipeline
+from apache_beam.runners.runner import PipelineState
 from apache_beam.utils.pipeline_options import PipelineOptions
 from nose.plugins.skip import SkipTest
 
@@ -89,7 +90,9 @@ class TestPipeline(Pipeline):
   def run(self):
     result = super(TestPipeline, self).run()
     if self.blocking:
-      result.wait_until_finish()
+      state = result.wait_until_finish()
+      assert state == PipelineState.DONE, "Pipeline execution failed."
+
     return result
 
   def _parse_test_option_args(self, argv):

http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index d493c46..a2a4144 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -20,9 +20,9 @@
 import unittest
 
 import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import combiners
 from apache_beam.transforms.aggregator import Aggregator
-from apache_beam.test_pipeline import TestPipeline
 
 
 class AggregatorTest(unittest.TestCase):


[2/3] beam git commit: DataflowRunner will raise an exception on failures.

Posted by ro...@apache.org.
DataflowRunner will raise an exception on failures.

This is the same behavior as before.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed81a26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed81a26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed81a26

Branch: refs/heads/python-sdk
Commit: 1ed81a2655a2c98655d8e5ce965eb72681388926
Parents: aa3a2cb
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 11:06:38 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:21 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      | 17 ++++--
 .../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index fd22753..bd25dbf 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,6 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
-    runner.result = DataflowPipelineResult(response, runner)
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -705,9 +704,11 @@ class DataflowPipelineResult(PipelineResult):
       while thread.isAlive():
         time.sleep(5.0)
       if self.state != PipelineState.DONE:
-        logging.error(
-            'Dataflow pipeline failed. State: %s, Error:\n%s',
-            self.state, getattr(self._runner, 'last_error_msg', None))
+        # TODO(BEAM-1290): Consider converting this to an error log based on the
+        # resolution of the issue.
+        raise DataflowRuntimeException(
+            'Dataflow pipeline failed. State: %s, Error:\n%s' %
+            (self.state, getattr(self._runner, 'last_error_msg', None)), self)
     return self.state
 
   def __str__(self):
@@ -718,3 +719,11 @@ class DataflowPipelineResult(PipelineResult):
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
+
+
+class DataflowRuntimeException(Exception):
+  """Indicates an error has occurred in running this pipeline."""
+
+  def __init__(self, msg, result):
+    super(DataflowRuntimeException, self).__init__(msg)
+    self.result = result

http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
new file mode 100644
index 0000000..a935c98
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DataflowRunner class."""
+
+import unittest
+import mock
+
+from apache_beam.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow_runner import DataflowPipelineResult
+
+
+class DataflowRunnerTest(unittest.TestCase):
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_until_finish(self, patched_time_sleep):
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+    class MockDataflowRunner(object):
+
+      def __init__(self, final_state):
+        self.dataflow_client = mock.MagicMock()
+        self.job = mock.MagicMock()
+        self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+
+        def get_job_side_effect(*args, **kwargs):
+          self.job.currentState = final_state
+          return mock.DEFAULT
+
+        self.dataflow_client.get_job = mock.MagicMock(
+            return_value=self.job, side_effect=get_job_side_effect)
+        self.dataflow_client.list_messages = mock.MagicMock(
+            return_value=([], None))
+
+    with self.assertRaises(DataflowRuntimeException) as e:
+      failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+      failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+      failed_result.wait_until_finish()
+    self.assertTrue(
+        'Dataflow pipeline failed. State: FAILED' in e.exception.message)
+
+    succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+    succeeded_result = DataflowPipelineResult(
+        succeeded_runner.job, succeeded_runner)
+    succeeded_result.wait_until_finish()
+
+
+if __name__ == '__main__':
+  unittest.main()


[3/3] beam git commit: Closes #1802

Posted by ro...@apache.org.
Closes #1802


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82599a24
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82599a24
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82599a24

Branch: refs/heads/python-sdk
Commit: 82599a241454ba53640c0da4cae3f60ea7668e3e
Parents: c57c66e 1ed81a2
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jan 20 16:46:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:22 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  7 ++-
 .../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
 sdks/python/apache_beam/test_pipeline.py        |  5 +-
 .../apache_beam/transforms/aggregator_test.py   |  2 +-
 4 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------