You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/12 20:17:22 UTC
[1/3] beam git commit: [BEAM-1188] Python File Verifer For E2E Tests
Repository: beam
Updated Branches:
refs/heads/python-sdk 4ba0b60a8 -> e23c3cab3
[BEAM-1188] Python File Verifer For E2E Tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/feca7cfe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/feca7cfe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/feca7cfe
Branch: refs/heads/python-sdk
Commit: feca7cfe45a33f46f55da23368a7ab601d307b62
Parents: 4ba0b60
Author: Mark Liu <ma...@google.com>
Authored: Mon Jan 9 23:48:42 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:07 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 23 ++++--
.../apache_beam/io/datastore/v1/helper_test.py | 31 +--------
sdks/python/apache_beam/test_pipeline.py | 67 +++++++++++-------
sdks/python/apache_beam/test_pipeline_test.py | 73 ++++++++++++++++----
.../apache_beam/tests/pipeline_verifiers.py | 67 ++++++++++++++++++
.../tests/pipeline_verifiers_test.py | 65 +++++++++++++++--
sdks/python/apache_beam/tests/test_utils.py | 47 +++++++++++++
7 files changed, 298 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index b74e075..bf63688 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,23 +20,38 @@
import logging
import unittest
+from datetime import datetime as dt
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
from apache_beam.examples import wordcount
from apache_beam.test_pipeline import TestPipeline
from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from nose.plugins.attrib import attr
+from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
class WordCountIT(unittest.TestCase):
+ DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+
@attr('IT')
def test_wordcount_it(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+
# Set extra options to the pipeline for test purpose
- extra_opts = {'on_success_matcher': PipelineStateMatcher()}
+ output = '/'.join([test_pipeline.get_option('output'),
+ dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+ 'results'])
+ pipeline_verifiers = [PipelineStateMatcher(),
+ FileChecksumMatcher(output + '*-of-*',
+ self.DEFAULT_CHECKSUM)]
+ extra_opts = {'output': output,
+ 'on_success_matcher': all_of(*pipeline_verifiers)}
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
- test_pipeline = TestPipeline(is_integration_test=True)
- wordcount.run(test_pipeline.get_test_option_args(**extra_opts))
+ wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
index 6f45993..051d150 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -16,7 +16,6 @@
#
"""Tests for datastore helper."""
-import imp
import sys
import unittest
@@ -26,11 +25,11 @@ from google.datastore.v1 import query_pb2
from google.datastore.v1.entity_pb2 import Key
from googledatastore.connection import RPCError
from googledatastore import helper as datastore_helper
-from mock import MagicMock, Mock, patch
+from mock import MagicMock
from apache_beam.io.datastore.v1 import fake_datastore
from apache_beam.io.datastore.v1 import helper
-from apache_beam.utils import retry
+from apache_beam.tests.test_utils import patch_retry
class HelperTest(unittest.TestCase):
@@ -39,31 +38,7 @@ class HelperTest(unittest.TestCase):
self._mock_datastore = MagicMock()
self._query = query_pb2.Query()
self._query.kind.add().name = 'dummy_kind'
- self.patch_retry()
-
- def patch_retry(self):
-
- """A function to patch retry module to use mock clock and logger."""
- real_retry_with_exponential_backoff = retry.with_exponential_backoff
-
- def patched_retry_with_exponential_backoff(num_retries, retry_filter):
- """A patch for retry decorator to use a mock dummy clock and logger."""
- return real_retry_with_exponential_backoff(
- num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
- clock=Mock())
-
- patch.object(retry, 'with_exponential_backoff',
- side_effect=patched_retry_with_exponential_backoff).start()
-
- # Reload module after patching.
- imp.reload(helper)
-
- def kill_patches():
- patch.stopall()
- # Reload module again after removing patch.
- imp.reload(helper)
-
- self.addCleanup(kill_patches)
+ patch_retry(self, helper)
def permanent_datastore_failure(self, req):
raise RPCError("dummy", 500, "failed")
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 48b98b2..69040d1 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -78,34 +78,18 @@ class TestPipeline(Pipeline):
expected type.
"""
self.is_integration_test = is_integration_test
+ self.options_list = self._parse_test_option_args(argv)
if options is None:
- options = PipelineOptions(self.get_test_option_args(argv))
+ options = PipelineOptions(self.options_list)
super(TestPipeline, self).__init__(runner, options)
- def _append_extra_opts(self, opt_list, extra_opts):
- """Append extra pipeline options to existing option list.
-
- Test verifier (if contains) should be pickled before append, and
- will be unpickled later in TestRunner.
- """
- for k, v in extra_opts.items():
- if not v:
- continue
- elif isinstance(v, bool) and v:
- opt_list.append('--%s' % k)
- elif 'matcher' in k:
- opt_list.append('--%s=%s' % (k, pickler.dumps(v)))
- else:
- opt_list.append('--%s=%s' % (k, v))
-
- def get_test_option_args(self, argv=None, **kwargs):
- """Get pipeline options as argument list by parsing value of command line
- argument: --test-pipeline-options combined with given extra options.
+ def _parse_test_option_args(self, argv):
+ """Parse value of command line argument: --test-pipeline-options to get
+ pipeline options.
Args:
argv: An iterable of command line arguments to be used. If not specified
then sys.argv will be used as input for parsing arguments.
- kwargs: Extra pipeline options for the test.
Returns:
An argument list of options that can be parsed by argparser or directly
@@ -125,10 +109,43 @@ class TestPipeline(Pipeline):
raise SkipTest('IT is skipped because --test-pipeline-options '
'is not specified')
- options_list = shlex.split(known.test_pipeline_options) \
+ return shlex.split(known.test_pipeline_options) \
if known.test_pipeline_options else []
- if kwargs:
- self._append_extra_opts(options_list, kwargs)
+ def get_full_options_as_args(self, **extra_opts):
+ """Get full pipeline options as an argument list.
+
+ Append extra pipeline options to existing option list if provided.
+ Test verifier (if contains in extra options) should be pickled before
+ append, and will be unpickled later in TestRunner.
+ """
+ options = list(self.options_list)
+ for k, v in extra_opts.items():
+ if not v:
+ continue
+ elif isinstance(v, bool) and v:
+ options.append('--%s' % k)
+ elif 'matcher' in k:
+ options.append('--%s=%s' % (k, pickler.dumps(v)))
+ else:
+ options.append('--%s=%s' % (k, v))
+ return options
+
+ def get_option(self, opt_name):
+ """Get a pipeline option value by name
- return options_list
+ Args:
+ opt_name: The name of the pipeline option.
+
+ Returns:
+ None if option is not found in existing option list which is generated
+ by parsing value of argument `test-pipeline-options`.
+ """
+ parser = argparse.ArgumentParser()
+ opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
+ # Option name should start with '--' when it's used for parsing.
+ parser.add_argument('--' + opt_name,
+ type=str,
+ action='store')
+ known, _ = parser.parse_known_args(self.options_list)
+ return getattr(known, opt_name) if hasattr(known, opt_name) else None
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 42ba2d7..41b935c 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -17,12 +17,24 @@
"""Unit test for the TestPipeline class"""
+import logging
import unittest
+import mock
+from hamcrest.core.base_matcher import BaseMatcher
+from hamcrest.core.assert_that import assert_that as hc_assert_that
+
+from apache_beam.internal import pickler
from apache_beam.test_pipeline import TestPipeline
from apache_beam.utils.pipeline_options import PipelineOptions
+# A simple matcher that is ued for testing extra options appending.
+class SimpleMatcher(BaseMatcher):
+ def _matches(self, item):
+ return True
+
+
class TestPipelineTest(unittest.TestCase):
TEST_CASE = {'options':
@@ -32,9 +44,6 @@ class TestPipelineTest(unittest.TestCase):
'male': True,
'age': 1}}
- def setUp(self):
- self.pipeline = TestPipeline()
-
# Used for testing pipeline option creation.
class TestParsingOptions(PipelineOptions):
@@ -45,19 +54,57 @@ class TestPipelineTest(unittest.TestCase):
parser.add_argument('--age', action='store', type=int, help='mock age')
def test_option_args_parsing(self):
+ test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
self.assertListEqual(
- self.pipeline.get_test_option_args(argv=self.TEST_CASE['options']),
+ test_pipeline.get_full_options_as_args(),
self.TEST_CASE['expected_list'])
+ def test_empty_option_args_parsing(self):
+ test_pipeline = TestPipeline()
+ self.assertListEqual([],
+ test_pipeline.get_full_options_as_args())
+
def test_create_test_pipeline_options(self):
- test_options = PipelineOptions(
- self.pipeline.get_test_option_args(self.TEST_CASE['options']))
- self.assertDictContainsSubset(
- self.TEST_CASE['expected_dict'], test_options.get_all_options())
+ test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+ test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
+ self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
+ test_options.get_all_options())
+
+ EXTRA_OPT_CASES = [
+ {'options': {'name': 'Mark'},
+ 'expected': ['--name=Mark']},
+ {'options': {'student': True},
+ 'expected': ['--student']},
+ {'options': {'student': False},
+ 'expected': []}
+ ]
def test_append_extra_options(self):
- extra_opt = {'name': 'Mark'}
- options_list = self.pipeline.get_test_option_args(
- argv=self.TEST_CASE['options'], **extra_opt)
- expected_list = self.TEST_CASE['expected_list'] + ['--name=Mark']
- self.assertListEqual(expected_list, options_list)
+ test_pipeline = TestPipeline()
+ for case in self.EXTRA_OPT_CASES:
+ opt_list = test_pipeline.get_full_options_as_args(**case['options'])
+ self.assertListEqual(opt_list, case['expected'])
+
+ def test_append_verifier_in_extra_opt(self):
+ extra_opt = {'matcher': SimpleMatcher()}
+ opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
+ matcher = pickler.loads(opt_list[0].split('=')[1])
+ self.assertTrue(isinstance(matcher, BaseMatcher))
+ hc_assert_that(mock.Mock(), matcher)
+
+ def test_get_option(self):
+ name, value = ('job', 'mockJob')
+ test_pipeline = TestPipeline()
+ test_pipeline.options_list = ['--%s=%s' % (name, value)]
+ self.assertEqual(test_pipeline.get_option(name), value)
+
+ def test_skip_IT(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ test_pipeline.run()
+ # Note that this will never be reached since it should be skipped above.
+ self.fail()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 1a6dd45..efcfbdf 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -22,9 +22,16 @@ of test pipeline job. Customized verifier should extend
`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
"""
+import hashlib
+import logging
+
+from apache_beam.io.fileio import TextFileSource
from apache_beam.runners.runner import PipelineState
+from apache_beam.utils import retry
from hamcrest.core.base_matcher import BaseMatcher
+MAX_RETRIES = 4
+
class PipelineStateMatcher(BaseMatcher):
"""Matcher that verify pipeline job terminated in expected state
@@ -48,3 +55,63 @@ class PipelineStateMatcher(BaseMatcher):
mismatch_description \
.append_text("Test pipeline job terminated in state: ") \
.append_text(pipeline_result.current_state())
+
+
+def retry_on_fileio_error(exception):
+ """Filter allowing retries on file I/O errors."""
+ if isinstance(exception, RuntimeError) or \
+ isinstance(exception, IOError):
+ # GCS I/O raises RuntimeError and local filesystem I/O
+ # raises IOError when file reading is failed.
+ return True
+ else:
+ return False
+
+
+class FileChecksumMatcher(BaseMatcher):
+ """Matcher that verifies file(s) content by comparing file checksum.
+
+ Use fileio to fetch file(s) from given path. Currently, fileio supports
+ local filesystem and GCS.
+
+ File checksum is a SHA-1 hash computed from content of file(s).
+ """
+
+ def __init__(self, file_path, expected_checksum):
+ self.file_path = file_path
+ self.expected_checksum = expected_checksum
+
+ @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
+ retry_filter=retry_on_fileio_error)
+ def _read_with_retry(self):
+ """Read path with retry if I/O failed"""
+ source = TextFileSource(self.file_path)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ return read_lines
+
+ def _matches(self, _):
+ # Read from given file(s) path
+ read_lines = self._read_with_retry()
+
+ # Compute checksum
+ read_lines.sort()
+ m = hashlib.new('sha1')
+ for line in read_lines:
+ m.update(line)
+ self.checksum, num_lines = (m.hexdigest(), len(read_lines))
+ logging.info('Read from given path %s, %d lines, checksum: %s.',
+ self.file_path, num_lines, self.checksum)
+ return self.checksum == self.expected_checksum
+
+ def describe_to(self, description):
+ description \
+ .append_text("Expected checksum is ") \
+ .append_text(self.expected_checksum)
+
+ def describe_mismatch(self, pipeline_result, mismatch_description):
+ mismatch_description \
+ .append_text("Actual checksum is ") \
+ .append_text(self.checksum)
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 9f70b45..91dedad 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -18,27 +18,37 @@
"""Unit tests for the test pipeline verifiers"""
import logging
+import tempfile
import unittest
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.fileio import TextFileSource
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PipelineResult
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from hamcrest import assert_that as hc_assert_that
+from apache_beam.tests import pipeline_verifiers as verifiers
+from apache_beam.tests.test_utils import patch_retry
class PipelineVerifiersTest(unittest.TestCase):
+ def setUp(self):
+ self._mock_result = Mock()
+ patch_retry(self, verifiers)
+
def test_pipeline_state_matcher_success(self):
"""Test PipelineStateMatcher successes when using default expected state
and job actually finished in DONE
"""
pipeline_result = PipelineResult(PipelineState.DONE)
- hc_assert_that(pipeline_result, PipelineStateMatcher())
+ hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
def test_pipeline_state_matcher_given_state(self):
"""Test PipelineStateMatcher successes when matches given state"""
pipeline_result = PipelineResult(PipelineState.FAILED)
- hc_assert_that(pipeline_result, PipelineStateMatcher(PipelineState.FAILED))
+ hc_assert_that(pipeline_result,
+ verifiers.PipelineStateMatcher(PipelineState.FAILED))
def test_pipeline_state_matcher_fails(self):
"""Test PipelineStateMatcher fails when using default expected state
@@ -53,7 +63,52 @@ class PipelineVerifiersTest(unittest.TestCase):
for state in failed_state:
pipeline_result = PipelineResult(state)
with self.assertRaises(AssertionError):
- hc_assert_that(pipeline_result, PipelineStateMatcher())
+ hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+ test_cases = [
+ {'content': 'Test FileChecksumMatcher with single file',
+ 'num_files': 1,
+ 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
+ {'content': 'Test FileChecksumMatcher with multiple files',
+ 'num_files': 3,
+ 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
+ {'content': '',
+ 'num_files': 1,
+ 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
+ ]
+
+ def create_temp_file(self, content, directory=None):
+ with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
+ f.write(content)
+ return f.name
+
+ def test_file_checksum_matcher_success(self):
+ for case in self.test_cases:
+ temp_dir = tempfile.mkdtemp()
+ for _ in range(case['num_files']):
+ self.create_temp_file(case['content'], temp_dir)
+ matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+ case['expected_checksum'])
+ hc_assert_that(self._mock_result, matcher)
+
+ @patch.object(TextFileSource, 'reader')
+ def test_file_checksum_matcher_read_failed(self, mock_reader):
+ mock_reader.side_effect = IOError('No file found.')
+ matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+ with self.assertRaises(IOError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_reader.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+
+ @patch.object(TextFileSource, 'reader')
+ def test_file_checksum_matcher_service_error(self, mock_reader):
+ mock_reader.side_effect = RuntimeError('GCS service failed.')
+ matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+ with self.assertRaises(RuntimeError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_reader.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
new file mode 100644
index 0000000..fc99fe9
--- /dev/null
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility methods for testing"""
+
+import imp
+from mock import Mock, patch
+
+from apache_beam.utils import retry
+
+
+def patch_retry(cls, module):
+ """A function to patch retry module to use mock clock and logger."""
+ real_retry_with_exponential_backoff = retry.with_exponential_backoff
+
+ def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+ """A patch for retry decorator to use a mock dummy clock and logger."""
+ return real_retry_with_exponential_backoff(
+ num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
+ clock=Mock())
+
+ patch.object(retry, 'with_exponential_backoff',
+ side_effect=patched_retry_with_exponential_backoff).start()
+
+ # Reload module after patching.
+ imp.reload(module)
+
+ def kill_patches():
+ patch.stopall()
+ # Reload module again after removing patch.
+ imp.reload(module)
+
+ cls.addCleanup(kill_patches)
[3/3] beam git commit: Closes #1756
Posted by ro...@apache.org.
Closes #1756
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e23c3cab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e23c3cab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e23c3cab
Branch: refs/heads/python-sdk
Commit: e23c3cab3b9ad261c28fb4049de30d4b05522bfb
Parents: 4ba0b60 c955ad6
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jan 12 12:17:09 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:09 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 24 +++++--
.../apache_beam/io/datastore/v1/helper_test.py | 31 +-------
sdks/python/apache_beam/test_pipeline.py | 67 +++++++++++-------
sdks/python/apache_beam/test_pipeline_test.py | 74 ++++++++++++++++----
.../apache_beam/tests/pipeline_verifiers.py | 71 ++++++++++++++++++-
.../tests/pipeline_verifiers_test.py | 68 ++++++++++++++++--
sdks/python/apache_beam/tests/test_utils.py | 57 +++++++++++++++
7 files changed, 316 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-1188] Use fileio.ChannelFactory instead
of TextFileSource
Posted by ro...@apache.org.
[BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c955ad63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c955ad63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c955ad63
Branch: refs/heads/python-sdk
Commit: c955ad639e04687559dff3279b75a6e75934313d
Parents: feca7cf
Author: Mark Liu <ma...@google.com>
Authored: Wed Jan 11 15:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:08 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 7 ++--
sdks/python/apache_beam/test_pipeline.py | 2 +-
sdks/python/apache_beam/test_pipeline_test.py | 7 ++--
.../apache_beam/tests/pipeline_verifiers.py | 36 +++++++++++---------
.../tests/pipeline_verifiers_test.py | 31 +++++++++--------
sdks/python/apache_beam/tests/test_utils.py | 18 +++++++---
6 files changed, 59 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index bf63688..77926bb 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,7 +20,6 @@
import logging
import unittest
-from datetime import datetime as dt
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
@@ -32,7 +31,9 @@ from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
class WordCountIT(unittest.TestCase):
- DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+ # The default checksum is a SHA-1 hash generated from a sorted list of
+ # lines read from expected output.
+ DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
@attr('IT')
def test_wordcount_it(self):
@@ -40,7 +41,7 @@ class WordCountIT(unittest.TestCase):
# Set extra options to the pipeline for test purpose
output = '/'.join([test_pipeline.get_option('output'),
- dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+ test_pipeline.get_option('job_name'),
'results'])
pipeline_verifiers = [PipelineStateMatcher(),
FileChecksumMatcher(output + '*-of-*',
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69040d1..69f4ddd 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -117,7 +117,7 @@ class TestPipeline(Pipeline):
Append extra pipeline options to existing option list if provided.
Test verifier (if contains in extra options) should be pickled before
- append, and will be unpickled later in TestRunner.
+ appending, and will be unpickled later in the TestRunner.
"""
options = list(self.options_list)
for k, v in extra_opts.items():
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 41b935c..526a8a2 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -19,7 +19,6 @@
import logging
import unittest
-import mock
from hamcrest.core.base_matcher import BaseMatcher
from hamcrest.core.assert_that import assert_that as hc_assert_that
@@ -76,7 +75,9 @@ class TestPipelineTest(unittest.TestCase):
{'options': {'student': True},
'expected': ['--student']},
{'options': {'student': False},
- 'expected': []}
+ 'expected': []},
+ {'options': {'name': 'Mark', 'student': True},
+ 'expected': ['--name=Mark', '--student']}
]
def test_append_extra_options(self):
@@ -90,7 +91,7 @@ class TestPipelineTest(unittest.TestCase):
opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
matcher = pickler.loads(opt_list[0].split('=')[1])
self.assertTrue(isinstance(matcher, BaseMatcher))
- hc_assert_that(mock.Mock(), matcher)
+ hc_assert_that(None, matcher)
def test_get_option(self):
name, value = ('job', 'mockJob')
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index efcfbdf..6bf8d48 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -25,10 +25,12 @@ of test pipeline job. Customized verifier should extend
import hashlib
import logging
-from apache_beam.io.fileio import TextFileSource
+from apitools.base.py.exceptions import HttpError
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.fileio import ChannelFactory
from apache_beam.runners.runner import PipelineState
from apache_beam.utils import retry
-from hamcrest.core.base_matcher import BaseMatcher
MAX_RETRIES = 4
@@ -57,12 +59,10 @@ class PipelineStateMatcher(BaseMatcher):
.append_text(pipeline_result.current_state())
-def retry_on_fileio_error(exception):
- """Filter allowing retries on file I/O errors."""
- if isinstance(exception, RuntimeError) or \
+def retry_on_io_error_and_server_error(exception):
+ """Filter allowing retries on file I/O errors and service error."""
+ if isinstance(exception, HttpError) or \
isinstance(exception, IOError):
- # GCS I/O raises RuntimeError and local filesystem I/O
- # raises IOError when file reading is failed.
return True
else:
return False
@@ -71,25 +71,27 @@ def retry_on_fileio_error(exception):
class FileChecksumMatcher(BaseMatcher):
"""Matcher that verifies file(s) content by comparing file checksum.
- Use fileio to fetch file(s) from given path. Currently, fileio supports
- local filesystem and GCS.
-
- File checksum is a SHA-1 hash computed from content of file(s).
+ Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+ is a SHA-1 hash computed from content of file(s).
"""
def __init__(self, file_path, expected_checksum):
self.file_path = file_path
self.expected_checksum = expected_checksum
- @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
- retry_filter=retry_on_fileio_error)
+ @retry.with_exponential_backoff(
+ num_retries=MAX_RETRIES,
+ retry_filter=retry_on_io_error_and_server_error)
def _read_with_retry(self):
"""Read path with retry if I/O failed"""
- source = TextFileSource(self.file_path)
read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
+ matched_path = ChannelFactory.glob(self.file_path)
+ if not matched_path:
+ raise IOError('No such file or directory: %s' % self.file_path)
+ for path in matched_path:
+ with ChannelFactory.open(path, 'r') as f:
+ for line in f:
+ read_lines.append(line)
return read_lines
def _matches(self, _):
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 91dedad..84a4dbe 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -21,10 +21,11 @@ import logging
import tempfile
import unittest
+from apitools.base.py.exceptions import HttpError
from hamcrest import assert_that as hc_assert_that
from mock import Mock, patch
-from apache_beam.io.fileio import TextFileSource
+from apache_beam.io.fileio import ChannelFactory
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PipelineResult
from apache_beam.tests import pipeline_verifiers as verifiers
@@ -91,23 +92,25 @@ class PipelineVerifiersTest(unittest.TestCase):
case['expected_checksum'])
hc_assert_that(self._mock_result, matcher)
- @patch.object(TextFileSource, 'reader')
- def test_file_checksum_matcher_read_failed(self, mock_reader):
- mock_reader.side_effect = IOError('No file found.')
+ @patch.object(ChannelFactory, 'glob')
+ def test_file_checksum_matcher_read_failed(self, mock_glob):
+ mock_glob.side_effect = IOError('No file found.')
matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
with self.assertRaises(IOError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_reader.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
-
- @patch.object(TextFileSource, 'reader')
- def test_file_checksum_matcher_service_error(self, mock_reader):
- mock_reader.side_effect = RuntimeError('GCS service failed.')
- matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
- with self.assertRaises(RuntimeError):
+ self.assertTrue(mock_glob.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+
+ @patch.object(ChannelFactory, 'glob')
+ def test_file_checksum_matcher_service_error(self, mock_glob):
+ mock_glob.side_effect = HttpError(
+ response={'status': '404'}, url='', content='Not Found',
+ )
+ matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+ with self.assertRaises(HttpError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_reader.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+ self.assertTrue(mock_glob.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index fc99fe9..3fdfe88 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -23,8 +23,18 @@ from mock import Mock, patch
from apache_beam.utils import retry
-def patch_retry(cls, module):
- """A function to patch retry module to use mock clock and logger."""
+def patch_retry(testcase, module):
+ """A function to patch retry module to use mock clock and logger.
+
+ Clock and logger that defined in retry decorator will be replaced in test
+ in order to skip sleep phase when retry happens.
+
+ Args:
+ testcase: An instance of unittest.TestCase that calls this function to
+ patch retry module.
+ module: The module that uses retry and need to be replaced with mock
+ clock and logger in test.
+ """
real_retry_with_exponential_backoff = retry.with_exponential_backoff
def patched_retry_with_exponential_backoff(num_retries, retry_filter):
@@ -39,9 +49,9 @@ def patch_retry(cls, module):
# Reload module after patching.
imp.reload(module)
- def kill_patches():
+ def remove_patches():
patch.stopall()
# Reload module again after removing patch.
imp.reload(module)
- cls.addCleanup(kill_patches)
+ testcase.addCleanup(remove_patches)