You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/21 00:46:47 UTC
[2/3] beam git commit: DataflowRunner will raise an exception on
failures.
DataflowRunner will raise an exception on failures.
This is the same behavior as before.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed81a26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed81a26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed81a26
Branch: refs/heads/python-sdk
Commit: 1ed81a2655a2c98655d8e5ce965eb72681388926
Parents: aa3a2cb
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 11:06:38 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:21 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 17 ++++--
.../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index fd22753..bd25dbf 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,6 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
- runner.result = DataflowPipelineResult(response, runner)
runner.last_error_msg = last_error_msg
def run(self, pipeline):
@@ -705,9 +704,11 @@ class DataflowPipelineResult(PipelineResult):
while thread.isAlive():
time.sleep(5.0)
if self.state != PipelineState.DONE:
- logging.error(
- 'Dataflow pipeline failed. State: %s, Error:\n%s',
- self.state, getattr(self._runner, 'last_error_msg', None))
+ # TODO(BEAM-1290): Consider converting this to an error log based on the
+ # resolution of the issue.
+ raise DataflowRuntimeException(
+ 'Dataflow pipeline failed. State: %s, Error:\n%s' %
+ (self.state, getattr(self._runner, 'last_error_msg', None)), self)
return self.state
def __str__(self):
@@ -718,3 +719,11 @@ class DataflowPipelineResult(PipelineResult):
def __repr__(self):
return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
+
+
+class DataflowRuntimeException(Exception):
+ """Indicates an error has occurred in running this pipeline."""
+
+ def __init__(self, msg, result):
+ super(DataflowRuntimeException, self).__init__(msg)
+ self.result = result
http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
new file mode 100644
index 0000000..a935c98
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DataflowRunner class."""
+
+import unittest
+import mock
+
+from apache_beam.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow_runner import DataflowPipelineResult
+
+
+class DataflowRunnerTest(unittest.TestCase):
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_until_finish(self, patched_time_sleep):
+ values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+ class MockDataflowRunner(object):
+
+ def __init__(self, final_state):
+ self.dataflow_client = mock.MagicMock()
+ self.job = mock.MagicMock()
+ self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+
+ def get_job_side_effect(*args, **kwargs):
+ self.job.currentState = final_state
+ return mock.DEFAULT
+
+ self.dataflow_client.get_job = mock.MagicMock(
+ return_value=self.job, side_effect=get_job_side_effect)
+ self.dataflow_client.list_messages = mock.MagicMock(
+ return_value=([], None))
+
+ with self.assertRaises(DataflowRuntimeException) as e:
+ failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+ failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+ failed_result.wait_until_finish()
+ self.assertTrue(
+ 'Dataflow pipeline failed. State: FAILED' in e.exception.message)
+
+ succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+ succeeded_result = DataflowPipelineResult(
+ succeeded_runner.job, succeeded_runner)
+ succeeded_result.wait_until_finish()
+
+
+if __name__ == '__main__':
+ unittest.main()