You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/12 20:17:23 UTC
[2/3] beam git commit: [BEAM-1188] Use fileio.ChannelFactory instead
of TextFileSource
[BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c955ad63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c955ad63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c955ad63
Branch: refs/heads/python-sdk
Commit: c955ad639e04687559dff3279b75a6e75934313d
Parents: feca7cf
Author: Mark Liu <ma...@google.com>
Authored: Wed Jan 11 15:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:08 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 7 ++--
sdks/python/apache_beam/test_pipeline.py | 2 +-
sdks/python/apache_beam/test_pipeline_test.py | 7 ++--
.../apache_beam/tests/pipeline_verifiers.py | 36 +++++++++++---------
.../tests/pipeline_verifiers_test.py | 31 +++++++++--------
sdks/python/apache_beam/tests/test_utils.py | 18 +++++++---
6 files changed, 59 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index bf63688..77926bb 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,7 +20,6 @@
import logging
import unittest
-from datetime import datetime as dt
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
@@ -32,7 +31,9 @@ from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
class WordCountIT(unittest.TestCase):
- DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+ # The default checksum is a SHA-1 hash generated from a sorted list of
+ # lines read from expected output.
+ DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
@attr('IT')
def test_wordcount_it(self):
@@ -40,7 +41,7 @@ class WordCountIT(unittest.TestCase):
# Set extra options to the pipeline for test purpose
output = '/'.join([test_pipeline.get_option('output'),
- dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+ test_pipeline.get_option('job_name'),
'results'])
pipeline_verifiers = [PipelineStateMatcher(),
FileChecksumMatcher(output + '*-of-*',
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69040d1..69f4ddd 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -117,7 +117,7 @@ class TestPipeline(Pipeline):
Append extra pipeline options to existing option list if provided.
Test verifier (if contains in extra options) should be pickled before
- append, and will be unpickled later in TestRunner.
+ appending, and will be unpickled later in the TestRunner.
"""
options = list(self.options_list)
for k, v in extra_opts.items():
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 41b935c..526a8a2 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -19,7 +19,6 @@
import logging
import unittest
-import mock
from hamcrest.core.base_matcher import BaseMatcher
from hamcrest.core.assert_that import assert_that as hc_assert_that
@@ -76,7 +75,9 @@ class TestPipelineTest(unittest.TestCase):
{'options': {'student': True},
'expected': ['--student']},
{'options': {'student': False},
- 'expected': []}
+ 'expected': []},
+ {'options': {'name': 'Mark', 'student': True},
+ 'expected': ['--name=Mark', '--student']}
]
def test_append_extra_options(self):
@@ -90,7 +91,7 @@ class TestPipelineTest(unittest.TestCase):
opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
matcher = pickler.loads(opt_list[0].split('=')[1])
self.assertTrue(isinstance(matcher, BaseMatcher))
- hc_assert_that(mock.Mock(), matcher)
+ hc_assert_that(None, matcher)
def test_get_option(self):
name, value = ('job', 'mockJob')
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index efcfbdf..6bf8d48 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -25,10 +25,12 @@ of test pipeline job. Customized verifier should extend
import hashlib
import logging
-from apache_beam.io.fileio import TextFileSource
+from apitools.base.py.exceptions import HttpError
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.fileio import ChannelFactory
from apache_beam.runners.runner import PipelineState
from apache_beam.utils import retry
-from hamcrest.core.base_matcher import BaseMatcher
MAX_RETRIES = 4
@@ -57,12 +59,10 @@ class PipelineStateMatcher(BaseMatcher):
.append_text(pipeline_result.current_state())
-def retry_on_fileio_error(exception):
- """Filter allowing retries on file I/O errors."""
- if isinstance(exception, RuntimeError) or \
+def retry_on_io_error_and_server_error(exception):
+ """Filter allowing retries on file I/O errors and service error."""
+ if isinstance(exception, HttpError) or \
isinstance(exception, IOError):
- # GCS I/O raises RuntimeError and local filesystem I/O
- # raises IOError when file reading is failed.
return True
else:
return False
@@ -71,25 +71,27 @@ def retry_on_fileio_error(exception):
class FileChecksumMatcher(BaseMatcher):
"""Matcher that verifies file(s) content by comparing file checksum.
- Use fileio to fetch file(s) from given path. Currently, fileio supports
- local filesystem and GCS.
-
- File checksum is a SHA-1 hash computed from content of file(s).
+ Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+ is a SHA-1 hash computed from content of file(s).
"""
def __init__(self, file_path, expected_checksum):
self.file_path = file_path
self.expected_checksum = expected_checksum
- @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
- retry_filter=retry_on_fileio_error)
+ @retry.with_exponential_backoff(
+ num_retries=MAX_RETRIES,
+ retry_filter=retry_on_io_error_and_server_error)
def _read_with_retry(self):
"""Read path with retry if I/O failed"""
- source = TextFileSource(self.file_path)
read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
+ matched_path = ChannelFactory.glob(self.file_path)
+ if not matched_path:
+ raise IOError('No such file or directory: %s' % self.file_path)
+ for path in matched_path:
+ with ChannelFactory.open(path, 'r') as f:
+ for line in f:
+ read_lines.append(line)
return read_lines
def _matches(self, _):
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 91dedad..84a4dbe 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -21,10 +21,11 @@ import logging
import tempfile
import unittest
+from apitools.base.py.exceptions import HttpError
from hamcrest import assert_that as hc_assert_that
from mock import Mock, patch
-from apache_beam.io.fileio import TextFileSource
+from apache_beam.io.fileio import ChannelFactory
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PipelineResult
from apache_beam.tests import pipeline_verifiers as verifiers
@@ -91,23 +92,25 @@ class PipelineVerifiersTest(unittest.TestCase):
case['expected_checksum'])
hc_assert_that(self._mock_result, matcher)
- @patch.object(TextFileSource, 'reader')
- def test_file_checksum_matcher_read_failed(self, mock_reader):
- mock_reader.side_effect = IOError('No file found.')
+ @patch.object(ChannelFactory, 'glob')
+ def test_file_checksum_matcher_read_failed(self, mock_glob):
+ mock_glob.side_effect = IOError('No file found.')
matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
with self.assertRaises(IOError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_reader.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
-
- @patch.object(TextFileSource, 'reader')
- def test_file_checksum_matcher_service_error(self, mock_reader):
- mock_reader.side_effect = RuntimeError('GCS service failed.')
- matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
- with self.assertRaises(RuntimeError):
+ self.assertTrue(mock_glob.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+
+ @patch.object(ChannelFactory, 'glob')
+ def test_file_checksum_matcher_service_error(self, mock_glob):
+ mock_glob.side_effect = HttpError(
+ response={'status': '404'}, url='', content='Not Found',
+ )
+ matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+ with self.assertRaises(HttpError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_reader.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+ self.assertTrue(mock_glob.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index fc99fe9..3fdfe88 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -23,8 +23,18 @@ from mock import Mock, patch
from apache_beam.utils import retry
-def patch_retry(cls, module):
- """A function to patch retry module to use mock clock and logger."""
+def patch_retry(testcase, module):
+ """A function to patch retry module to use mock clock and logger.
+
+ Clock and logger that defined in retry decorator will be replaced in test
+ in order to skip sleep phase when retry happens.
+
+ Args:
+ testcase: An instance of unittest.TestCase that calls this function to
+ patch retry module.
+ module: The module that uses retry and need to be replaced with mock
+ clock and logger in test.
+ """
real_retry_with_exponential_backoff = retry.with_exponential_backoff
def patched_retry_with_exponential_backoff(num_retries, retry_filter):
@@ -39,9 +49,9 @@ def patch_retry(cls, module):
# Reload module after patching.
imp.reload(module)
- def kill_patches():
+ def remove_patches():
patch.stopall()
# Reload module again after removing patch.
imp.reload(module)
- cls.addCleanup(kill_patches)
+ testcase.addCleanup(remove_patches)