You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/12 20:17:23 UTC

[2/3] beam git commit: [BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource

[BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c955ad63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c955ad63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c955ad63

Branch: refs/heads/python-sdk
Commit: c955ad639e04687559dff3279b75a6e75934313d
Parents: feca7cf
Author: Mark Liu <ma...@google.com>
Authored: Wed Jan 11 15:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:08 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   |  7 ++--
 sdks/python/apache_beam/test_pipeline.py        |  2 +-
 sdks/python/apache_beam/test_pipeline_test.py   |  7 ++--
 .../apache_beam/tests/pipeline_verifiers.py     | 36 +++++++++++---------
 .../tests/pipeline_verifiers_test.py            | 31 +++++++++--------
 sdks/python/apache_beam/tests/test_utils.py     | 18 +++++++---
 6 files changed, 59 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index bf63688..77926bb 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,7 +20,6 @@
 import logging
 import unittest
 
-from datetime import datetime as dt
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
@@ -32,7 +31,9 @@ from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
 
 class WordCountIT(unittest.TestCase):
 
-  DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+  # The default checksum is a SHA-1 hash generated from a sorted list of
+  # lines read from expected output.
+  DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
 
   @attr('IT')
   def test_wordcount_it(self):
@@ -40,7 +41,7 @@ class WordCountIT(unittest.TestCase):
 
     # Set extra options to the pipeline for test purpose
     output = '/'.join([test_pipeline.get_option('output'),
-                       dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+                       test_pipeline.get_option('job_name'),
                        'results'])
     pipeline_verifiers = [PipelineStateMatcher(),
                           FileChecksumMatcher(output + '*-of-*',

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69040d1..69f4ddd 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -117,7 +117,7 @@ class TestPipeline(Pipeline):
 
     Append extra pipeline options to existing option list if provided.
     Test verifier (if contains in extra options) should be pickled before
-    append, and will be unpickled later in TestRunner.
+    appending, and will be unpickled later in the TestRunner.
     """
     options = list(self.options_list)
     for k, v in extra_opts.items():

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 41b935c..526a8a2 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -19,7 +19,6 @@
 
 import logging
 import unittest
-import mock
 
 from hamcrest.core.base_matcher import BaseMatcher
 from hamcrest.core.assert_that import assert_that as hc_assert_that
@@ -76,7 +75,9 @@ class TestPipelineTest(unittest.TestCase):
       {'options': {'student': True},
        'expected': ['--student']},
       {'options': {'student': False},
-       'expected': []}
+       'expected': []},
+      {'options': {'name': 'Mark', 'student': True},
+       'expected': ['--name=Mark', '--student']}
   ]
 
   def test_append_extra_options(self):
@@ -90,7 +91,7 @@ class TestPipelineTest(unittest.TestCase):
     opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
     matcher = pickler.loads(opt_list[0].split('=')[1])
     self.assertTrue(isinstance(matcher, BaseMatcher))
-    hc_assert_that(mock.Mock(), matcher)
+    hc_assert_that(None, matcher)
 
   def test_get_option(self):
     name, value = ('job', 'mockJob')

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index efcfbdf..6bf8d48 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -25,10 +25,12 @@ of test pipeline job. Customized verifier should extend
 import hashlib
 import logging
 
-from apache_beam.io.fileio import TextFileSource
+from apitools.base.py.exceptions import HttpError
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
 from apache_beam.utils import retry
-from hamcrest.core.base_matcher import BaseMatcher
 
 MAX_RETRIES = 4
 
@@ -57,12 +59,10 @@ class PipelineStateMatcher(BaseMatcher):
       .append_text(pipeline_result.current_state())
 
 
-def retry_on_fileio_error(exception):
-  """Filter allowing retries on file I/O errors."""
-  if isinstance(exception, RuntimeError) or \
+def retry_on_io_error_and_server_error(exception):
+  """Filter allowing retries on file I/O errors and service error."""
+  if isinstance(exception, HttpError) or \
           isinstance(exception, IOError):
-    # GCS I/O raises RuntimeError and local filesystem I/O
-    # raises IOError when file reading is failed.
     return True
   else:
     return False
@@ -71,25 +71,27 @@ def retry_on_fileio_error(exception):
 class FileChecksumMatcher(BaseMatcher):
   """Matcher that verifies file(s) content by comparing file checksum.
 
-  Use fileio to fetch file(s) from given path. Currently, fileio supports
-  local filesystem and GCS.
-
-  File checksum is a SHA-1 hash computed from content of file(s).
+  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+  is a SHA-1 hash computed from content of file(s).
   """
 
   def __init__(self, file_path, expected_checksum):
     self.file_path = file_path
     self.expected_checksum = expected_checksum
 
-  @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
-                                  retry_filter=retry_on_fileio_error)
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry_on_io_error_and_server_error)
   def _read_with_retry(self):
     """Read path with retry if I/O failed"""
-    source = TextFileSource(self.file_path)
     read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
+    matched_path = ChannelFactory.glob(self.file_path)
+    if not matched_path:
+      raise IOError('No such file or directory: %s' % self.file_path)
+    for path in matched_path:
+      with ChannelFactory.open(path, 'r') as f:
+        for line in f:
+          read_lines.append(line)
     return read_lines
 
   def _matches(self, _):

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 91dedad..84a4dbe 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -21,10 +21,11 @@ import logging
 import tempfile
 import unittest
 
+from apitools.base.py.exceptions import HttpError
 from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
-from apache_beam.io.fileio import TextFileSource
+from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.tests import pipeline_verifiers as verifiers
@@ -91,23 +92,25 @@ class PipelineVerifiersTest(unittest.TestCase):
                                               case['expected_checksum'])
       hc_assert_that(self._mock_result, matcher)
 
-  @patch.object(TextFileSource, 'reader')
-  def test_file_checksum_matcher_read_failed(self, mock_reader):
-    mock_reader.side_effect = IOError('No file found.')
+  @patch.object(ChannelFactory, 'glob')
+  def test_file_checksum_matcher_read_failed(self, mock_glob):
+    mock_glob.side_effect = IOError('No file found.')
     matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
     with self.assertRaises(IOError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_reader.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
-
-  @patch.object(TextFileSource, 'reader')
-  def test_file_checksum_matcher_service_error(self, mock_reader):
-    mock_reader.side_effect = RuntimeError('GCS service failed.')
-    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
-    with self.assertRaises(RuntimeError):
+    self.assertTrue(mock_glob.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+
+  @patch.object(ChannelFactory, 'glob')
+  def test_file_checksum_matcher_service_error(self, mock_glob):
+    mock_glob.side_effect = HttpError(
+        response={'status': '404'}, url='', content='Not Found',
+    )
+    matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+    with self.assertRaises(HttpError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_reader.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+    self.assertTrue(mock_glob.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index fc99fe9..3fdfe88 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -23,8 +23,18 @@ from mock import Mock, patch
 from apache_beam.utils import retry
 
 
-def patch_retry(cls, module):
-  """A function to patch retry module to use mock clock and logger."""
+def patch_retry(testcase, module):
+  """A function to patch retry module to use mock clock and logger.
+
+  Clock and logger that defined in retry decorator will be replaced in test
+  in order to skip sleep phase when retry happens.
+
+  Args:
+    testcase: An instance of unittest.TestCase that calls this function to
+      patch retry module.
+    module: The module that uses retry and need to be replaced with mock
+      clock and logger in test.
+  """
   real_retry_with_exponential_backoff = retry.with_exponential_backoff
 
   def patched_retry_with_exponential_backoff(num_retries, retry_filter):
@@ -39,9 +49,9 @@ def patch_retry(cls, module):
   # Reload module after patching.
   imp.reload(module)
 
-  def kill_patches():
+  def remove_patches():
     patch.stopall()
     # Reload module again after removing patch.
     imp.reload(module)
 
-  cls.addCleanup(kill_patches)
+  testcase.addCleanup(remove_patches)