You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/21 00:46:46 UTC
[1/3] beam git commit: Make TestPipeline.run fail when the underlying
execution fails.
Repository: beam
Updated Branches:
refs/heads/python-sdk c57c66ed4 -> 82599a241
Make TestPipeline.run fail when the underlying execution fails.
Also, DataflowRunner will log the last error from its wait_until_finish
method.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa3a2cb3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa3a2cb3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa3a2cb3
Branch: refs/heads/python-sdk
Commit: aa3a2cb326a5f761eba9fe87fe7d57da9ce78555
Parents: c57c66e
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jan 19 16:13:07 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:20 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/dataflow_runner.py | 12 ++++--------
sdks/python/apache_beam/test_pipeline.py | 5 ++++-
sdks/python/apache_beam/transforms/aggregator_test.py | 2 +-
3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 330472b..fd22753 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -704,6 +704,10 @@ class DataflowPipelineResult(PipelineResult):
thread.start()
while thread.isAlive():
time.sleep(5.0)
+ if self.state != PipelineState.DONE:
+ logging.error(
+ 'Dataflow pipeline failed. State: %s, Error:\n%s',
+ self.state, getattr(self._runner, 'last_error_msg', None))
return self.state
def __str__(self):
@@ -714,11 +718,3 @@ class DataflowPipelineResult(PipelineResult):
def __repr__(self):
return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
-
-
-class DataflowRuntimeException(Exception):
- """Indicates an error has occurred in running this pipeline."""
-
- def __init__(self, msg, result):
- super(DataflowRuntimeException, self).__init__(msg)
- self.result = result
http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index c29a879..7d85af9 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -22,6 +22,7 @@ import shlex
from apache_beam.internal import pickler
from apache_beam.pipeline import Pipeline
+from apache_beam.runners.runner import PipelineState
from apache_beam.utils.pipeline_options import PipelineOptions
from nose.plugins.skip import SkipTest
@@ -89,7 +90,9 @@ class TestPipeline(Pipeline):
def run(self):
result = super(TestPipeline, self).run()
if self.blocking:
- result.wait_until_finish()
+ state = result.wait_until_finish()
+ assert state == PipelineState.DONE, "Pipeline execution failed."
+
return result
def _parse_test_option_args(self, argv):
http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index d493c46..a2a4144 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -20,9 +20,9 @@
import unittest
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import combiners
from apache_beam.transforms.aggregator import Aggregator
-from apache_beam.test_pipeline import TestPipeline
class AggregatorTest(unittest.TestCase):
[2/3] beam git commit: DataflowRunner will raise an exception on
failures.
Posted by ro...@apache.org.
DataflowRunner will raise an exception on failures.
This is the same behavior as before.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed81a26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed81a26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed81a26
Branch: refs/heads/python-sdk
Commit: 1ed81a2655a2c98655d8e5ce965eb72681388926
Parents: aa3a2cb
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 11:06:38 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:21 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 17 ++++--
.../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index fd22753..bd25dbf 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,6 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
- runner.result = DataflowPipelineResult(response, runner)
runner.last_error_msg = last_error_msg
def run(self, pipeline):
@@ -705,9 +704,11 @@ class DataflowPipelineResult(PipelineResult):
while thread.isAlive():
time.sleep(5.0)
if self.state != PipelineState.DONE:
- logging.error(
- 'Dataflow pipeline failed. State: %s, Error:\n%s',
- self.state, getattr(self._runner, 'last_error_msg', None))
+ # TODO(BEAM-1290): Consider converting this to an error log based on the
+ # resolution of the issue.
+ raise DataflowRuntimeException(
+ 'Dataflow pipeline failed. State: %s, Error:\n%s' %
+ (self.state, getattr(self._runner, 'last_error_msg', None)), self)
return self.state
def __str__(self):
@@ -718,3 +719,11 @@ class DataflowPipelineResult(PipelineResult):
def __repr__(self):
return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
+
+
+class DataflowRuntimeException(Exception):
+ """Indicates an error has occurred in running this pipeline."""
+
+ def __init__(self, msg, result):
+ super(DataflowRuntimeException, self).__init__(msg)
+ self.result = result
http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
new file mode 100644
index 0000000..a935c98
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DataflowRunner class."""
+
+import unittest
+import mock
+
+from apache_beam.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow_runner import DataflowPipelineResult
+
+
+class DataflowRunnerTest(unittest.TestCase):
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_until_finish(self, patched_time_sleep):
+ values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+ class MockDataflowRunner(object):
+
+ def __init__(self, final_state):
+ self.dataflow_client = mock.MagicMock()
+ self.job = mock.MagicMock()
+ self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+
+ def get_job_side_effect(*args, **kwargs):
+ self.job.currentState = final_state
+ return mock.DEFAULT
+
+ self.dataflow_client.get_job = mock.MagicMock(
+ return_value=self.job, side_effect=get_job_side_effect)
+ self.dataflow_client.list_messages = mock.MagicMock(
+ return_value=([], None))
+
+ with self.assertRaises(DataflowRuntimeException) as e:
+ failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+ failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+ failed_result.wait_until_finish()
+ self.assertTrue(
+ 'Dataflow pipeline failed. State: FAILED' in e.exception.message)
+
+ succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+ succeeded_result = DataflowPipelineResult(
+ succeeded_runner.job, succeeded_runner)
+ succeeded_result.wait_until_finish()
+
+
+if __name__ == '__main__':
+ unittest.main()
[3/3] beam git commit: Closes #1802
Posted by ro...@apache.org.
Closes #1802
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82599a24
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82599a24
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82599a24
Branch: refs/heads/python-sdk
Commit: 82599a241454ba53640c0da4cae3f60ea7668e3e
Parents: c57c66e 1ed81a2
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jan 20 16:46:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 16:46:22 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 7 ++-
.../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
sdks/python/apache_beam/test_pipeline.py | 5 +-
.../apache_beam/transforms/aggregator_test.py | 2 +-
4 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------