You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/10 00:50:04 UTC
[1/2] beam git commit: [BEAM-1584] Add file clean up util for
integration tests
Repository: beam
Updated Branches:
refs/heads/master 7a34042c0 -> 0a5deeac5
[BEAM-1584] Add file clean up util for integration tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a17978a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a17978a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a17978a7
Branch: refs/heads/master
Commit: a17978a7825433342a7f2371f80d1612c8cda055
Parents: 7a34042
Author: Mark Liu <ma...@google.com>
Authored: Thu Aug 3 16:33:07 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Aug 9 17:49:52 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 4 ++
sdks/python/apache_beam/testing/test_utils.py | 18 ++++++
.../apache_beam/testing/test_utils_test.py | 59 ++++++++++++++++++++
sdks/python/apache_beam/utils/retry.py | 7 +++
4 files changed, 88 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 4bee127..8d2e73e 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -25,6 +25,7 @@ from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
from apache_beam.examples import wordcount
+from apache_beam.testing.test_utils import delete_files
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
@@ -56,6 +57,9 @@ class WordCountIT(unittest.TestCase):
extra_opts = {'output': output,
'on_success_matcher': all_of(*pipeline_verifiers)}
+ # Register clean up before pipeline execution
+ self.addCleanup(delete_files, [output + '*'])
+
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/testing/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 9feb80e..26ca03d 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -24,6 +24,7 @@ import hashlib
import imp
from mock import Mock, patch
+from apache_beam.io.filesystems import FileSystems
from apache_beam.utils import retry
@@ -71,3 +72,20 @@ def patch_retry(testcase, module):
imp.reload(module)
testcase.addCleanup(remove_patches)
+
+
+@retry.with_exponential_backoff(
+ num_retries=3,
+ retry_filter=retry.retry_on_beam_io_error_filter)
+def delete_files(file_paths):
+ """A function to clean up files or directories using ``FileSystems``.
+
+ Glob is supported in file path and directories will be deleted recursively.
+
+ Args:
+ file_paths: A list of strings contains file paths or directories.
+ """
+ if len(file_paths) == 0:
+ raise RuntimeError('Clean up failed. Invalid file path: %s.' %
+ file_paths)
+ FileSystems.delete(file_paths)
http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/testing/test_utils_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
new file mode 100644
index 0000000..bee0bd3
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for testing utilities,"""
+
+import logging
+import tempfile
+import unittest
+from mock import patch
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.testing import test_utils as utils
+
+
+class TestUtilsTest(unittest.TestCase):
+
+ def setUp(self):
+ utils.patch_retry(self, utils)
+ self.tmpdir = tempfile.mkdtemp()
+
+ def test_delete_files_succeeds(self):
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False)
+ assert FileSystems.exists(f.name)
+ utils.delete_files([f.name])
+ assert not FileSystems.exists(f.name)
+
+ @patch.object(FileSystems, 'delete', side_effect=BeamIOError(''))
+ def test_delete_files_fails_with_io_error(self, mocked_delete):
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False)
+ assert FileSystems.exists(f.name)
+
+ with self.assertRaises(BeamIOError):
+ utils.delete_files([f.name])
+ self.assertTrue(mocked_delete.called)
+ self.assertEqual(mocked_delete.call_count, 4)
+
+ def test_delete_files_fails_with_invalid_arg(self):
+ with self.assertRaises(RuntimeError):
+ utils.delete_files([])
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 1a8b907..08223b3 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -31,6 +31,8 @@ import sys
import time
import traceback
+from apache_beam.io.filesystem import BeamIOError
+
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
@@ -99,6 +101,11 @@ def retry_on_server_errors_and_timeout_filter(exception):
return retry_on_server_errors_filter(exception)
+def retry_on_beam_io_error_filter(exception):
+ """Filter allowing retries on Beam IO errors."""
+ return isinstance(exception, BeamIOError)
+
+
SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599]
[2/2] beam git commit: This closes #3688
Posted by al...@apache.org.
This closes #3688
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a5deeac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a5deeac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a5deeac
Branch: refs/heads/master
Commit: 0a5deeac5717096f59d4dce99ac56f81df7abe31
Parents: 7a34042 a17978a
Author: Ahmet Altay <al...@google.com>
Authored: Wed Aug 9 17:49:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Aug 9 17:49:54 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 4 ++
sdks/python/apache_beam/testing/test_utils.py | 18 ++++++
.../apache_beam/testing/test_utils_test.py | 59 ++++++++++++++++++++
sdks/python/apache_beam/utils/retry.py | 7 +++
4 files changed, 88 insertions(+)
----------------------------------------------------------------------