You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/12 20:17:22 UTC

[1/3] beam git commit: [BEAM-1188] Python File Verifer For E2E Tests

Repository: beam
Updated Branches:
  refs/heads/python-sdk 4ba0b60a8 -> e23c3cab3


[BEAM-1188] Python File Verifer For E2E Tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/feca7cfe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/feca7cfe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/feca7cfe

Branch: refs/heads/python-sdk
Commit: feca7cfe45a33f46f55da23368a7ab601d307b62
Parents: 4ba0b60
Author: Mark Liu <ma...@google.com>
Authored: Mon Jan 9 23:48:42 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:07 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   | 23 ++++--
 .../apache_beam/io/datastore/v1/helper_test.py  | 31 +--------
 sdks/python/apache_beam/test_pipeline.py        | 67 +++++++++++-------
 sdks/python/apache_beam/test_pipeline_test.py   | 73 ++++++++++++++++----
 .../apache_beam/tests/pipeline_verifiers.py     | 67 ++++++++++++++++++
 .../tests/pipeline_verifiers_test.py            | 65 +++++++++++++++--
 sdks/python/apache_beam/tests/test_utils.py     | 47 +++++++++++++
 7 files changed, 298 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index b74e075..bf63688 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,23 +20,38 @@
 import logging
 import unittest
 
+from datetime import datetime as dt
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
 from apache_beam.examples import wordcount
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from nose.plugins.attrib import attr
+from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
 
 
 class WordCountIT(unittest.TestCase):
 
+  DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+
   @attr('IT')
   def test_wordcount_it(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
     # Set extra options to the pipeline for test purpose
-    extra_opts = {'on_success_matcher': PipelineStateMatcher()}
+    output = '/'.join([test_pipeline.get_option('output'),
+                       dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+                       'results'])
+    pipeline_verifiers = [PipelineStateMatcher(),
+                          FileChecksumMatcher(output + '*-of-*',
+                                              self.DEFAULT_CHECKSUM)]
+    extra_opts = {'output': output,
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
-    test_pipeline = TestPipeline(is_integration_test=True)
-    wordcount.run(test_pipeline.get_test_option_args(**extra_opts))
+    wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
index 6f45993..051d150 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -16,7 +16,6 @@
 #
 
 """Tests for datastore helper."""
-import imp
 import sys
 import unittest
 
@@ -26,11 +25,11 @@ from google.datastore.v1 import query_pb2
 from google.datastore.v1.entity_pb2 import Key
 from googledatastore.connection import RPCError
 from googledatastore import helper as datastore_helper
-from mock import MagicMock, Mock, patch
+from mock import MagicMock
 
 from apache_beam.io.datastore.v1 import fake_datastore
 from apache_beam.io.datastore.v1 import helper
-from apache_beam.utils import retry
+from apache_beam.tests.test_utils import patch_retry
 
 
 class HelperTest(unittest.TestCase):
@@ -39,31 +38,7 @@ class HelperTest(unittest.TestCase):
     self._mock_datastore = MagicMock()
     self._query = query_pb2.Query()
     self._query.kind.add().name = 'dummy_kind'
-    self.patch_retry()
-
-  def patch_retry(self):
-
-    """A function to patch retry module to use mock clock and logger."""
-    real_retry_with_exponential_backoff = retry.with_exponential_backoff
-
-    def patched_retry_with_exponential_backoff(num_retries, retry_filter):
-      """A patch for retry decorator to use a mock dummy clock and logger."""
-      return real_retry_with_exponential_backoff(
-          num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
-          clock=Mock())
-
-    patch.object(retry, 'with_exponential_backoff',
-                 side_effect=patched_retry_with_exponential_backoff).start()
-
-    # Reload module after patching.
-    imp.reload(helper)
-
-    def kill_patches():
-      patch.stopall()
-      # Reload module again after removing patch.
-      imp.reload(helper)
-
-    self.addCleanup(kill_patches)
+    patch_retry(self, helper)
 
   def permanent_datastore_failure(self, req):
     raise RPCError("dummy", 500, "failed")

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 48b98b2..69040d1 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -78,34 +78,18 @@ class TestPipeline(Pipeline):
       expected type.
     """
     self.is_integration_test = is_integration_test
+    self.options_list = self._parse_test_option_args(argv)
     if options is None:
-      options = PipelineOptions(self.get_test_option_args(argv))
+      options = PipelineOptions(self.options_list)
     super(TestPipeline, self).__init__(runner, options)
 
-  def _append_extra_opts(self, opt_list, extra_opts):
-    """Append extra pipeline options to existing option list.
-
-    Test verifier (if contains) should be pickled before append, and
-    will be unpickled later in TestRunner.
-    """
-    for k, v in extra_opts.items():
-      if not v:
-        continue
-      elif isinstance(v, bool) and v:
-        opt_list.append('--%s' % k)
-      elif 'matcher' in k:
-        opt_list.append('--%s=%s' % (k, pickler.dumps(v)))
-      else:
-        opt_list.append('--%s=%s' % (k, v))
-
-  def get_test_option_args(self, argv=None, **kwargs):
-    """Get pipeline options as argument list by parsing value of command line
-    argument: --test-pipeline-options combined with given extra options.
+  def _parse_test_option_args(self, argv):
+    """Parse value of command line argument: --test-pipeline-options to get
+    pipeline options.
 
     Args:
       argv: An iterable of command line arguments to be used. If not specified
         then sys.argv will be used as input for parsing arguments.
-      kwargs: Extra pipeline options for the test.
 
     Returns:
       An argument list of options that can be parsed by argparser or directly
@@ -125,10 +109,43 @@ class TestPipeline(Pipeline):
       raise SkipTest('IT is skipped because --test-pipeline-options '
                      'is not specified')
 
-    options_list = shlex.split(known.test_pipeline_options) \
+    return shlex.split(known.test_pipeline_options) \
       if known.test_pipeline_options else []
 
-    if kwargs:
-      self._append_extra_opts(options_list, kwargs)
+  def get_full_options_as_args(self, **extra_opts):
+    """Get full pipeline options as an argument list.
+
+    Append extra pipeline options to existing option list if provided.
+    Test verifier (if contains in extra options) should be pickled before
+    append, and will be unpickled later in TestRunner.
+    """
+    options = list(self.options_list)
+    for k, v in extra_opts.items():
+      if not v:
+        continue
+      elif isinstance(v, bool) and v:
+        options.append('--%s' % k)
+      elif 'matcher' in k:
+        options.append('--%s=%s' % (k, pickler.dumps(v)))
+      else:
+        options.append('--%s=%s' % (k, v))
+    return options
+
+  def get_option(self, opt_name):
+    """Get a pipeline option value by name
 
-    return options_list
+    Args:
+      opt_name: The name of the pipeline option.
+
+    Returns:
+      None if option is not found in existing option list which is generated
+      by parsing value of argument `test-pipeline-options`.
+    """
+    parser = argparse.ArgumentParser()
+    opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
+    # Option name should start with '--' when it's used for parsing.
+    parser.add_argument('--' + opt_name,
+                        type=str,
+                        action='store')
+    known, _ = parser.parse_known_args(self.options_list)
+    return getattr(known, opt_name) if hasattr(known, opt_name) else None

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 42ba2d7..41b935c 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -17,12 +17,24 @@
 
 """Unit test for the TestPipeline class"""
 
+import logging
 import unittest
+import mock
 
+from hamcrest.core.base_matcher import BaseMatcher
+from hamcrest.core.assert_that import assert_that as hc_assert_that
+
+from apache_beam.internal import pickler
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.utils.pipeline_options import PipelineOptions
 
 
+# A simple matcher that is ued for testing extra options appending.
+class SimpleMatcher(BaseMatcher):
+  def _matches(self, item):
+    return True
+
+
 class TestPipelineTest(unittest.TestCase):
 
   TEST_CASE = {'options':
@@ -32,9 +44,6 @@ class TestPipelineTest(unittest.TestCase):
                                  'male': True,
                                  'age': 1}}
 
-  def setUp(self):
-    self.pipeline = TestPipeline()
-
   # Used for testing pipeline option creation.
   class TestParsingOptions(PipelineOptions):
 
@@ -45,19 +54,57 @@ class TestPipelineTest(unittest.TestCase):
       parser.add_argument('--age', action='store', type=int, help='mock age')
 
   def test_option_args_parsing(self):
+    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
     self.assertListEqual(
-        self.pipeline.get_test_option_args(argv=self.TEST_CASE['options']),
+        test_pipeline.get_full_options_as_args(),
         self.TEST_CASE['expected_list'])
 
+  def test_empty_option_args_parsing(self):
+    test_pipeline = TestPipeline()
+    self.assertListEqual([],
+                         test_pipeline.get_full_options_as_args())
+
   def test_create_test_pipeline_options(self):
-    test_options = PipelineOptions(
-        self.pipeline.get_test_option_args(self.TEST_CASE['options']))
-    self.assertDictContainsSubset(
-        self.TEST_CASE['expected_dict'], test_options.get_all_options())
+    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+    test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
+    self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
+                                  test_options.get_all_options())
+
+  EXTRA_OPT_CASES = [
+      {'options': {'name': 'Mark'},
+       'expected': ['--name=Mark']},
+      {'options': {'student': True},
+       'expected': ['--student']},
+      {'options': {'student': False},
+       'expected': []}
+  ]
 
   def test_append_extra_options(self):
-    extra_opt = {'name': 'Mark'}
-    options_list = self.pipeline.get_test_option_args(
-        argv=self.TEST_CASE['options'], **extra_opt)
-    expected_list = self.TEST_CASE['expected_list'] + ['--name=Mark']
-    self.assertListEqual(expected_list, options_list)
+    test_pipeline = TestPipeline()
+    for case in self.EXTRA_OPT_CASES:
+      opt_list = test_pipeline.get_full_options_as_args(**case['options'])
+      self.assertListEqual(opt_list, case['expected'])
+
+  def test_append_verifier_in_extra_opt(self):
+    extra_opt = {'matcher': SimpleMatcher()}
+    opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
+    matcher = pickler.loads(opt_list[0].split('=')[1])
+    self.assertTrue(isinstance(matcher, BaseMatcher))
+    hc_assert_that(mock.Mock(), matcher)
+
+  def test_get_option(self):
+    name, value = ('job', 'mockJob')
+    test_pipeline = TestPipeline()
+    test_pipeline.options_list = ['--%s=%s' % (name, value)]
+    self.assertEqual(test_pipeline.get_option(name), value)
+
+  def test_skip_IT(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    test_pipeline.run()
+    # Note that this will never be reached since it should be skipped above.
+    self.fail()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 1a6dd45..efcfbdf 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -22,9 +22,16 @@ of test pipeline job. Customized verifier should extend
 `hamcrest.core.base_matcher.BaseMatcher` and override _matches.
 """
 
+import hashlib
+import logging
+
+from apache_beam.io.fileio import TextFileSource
 from apache_beam.runners.runner import PipelineState
+from apache_beam.utils import retry
 from hamcrest.core.base_matcher import BaseMatcher
 
+MAX_RETRIES = 4
+
 
 class PipelineStateMatcher(BaseMatcher):
   """Matcher that verify pipeline job terminated in expected state
@@ -48,3 +55,63 @@ class PipelineStateMatcher(BaseMatcher):
     mismatch_description \
       .append_text("Test pipeline job terminated in state: ") \
       .append_text(pipeline_result.current_state())
+
+
+def retry_on_fileio_error(exception):
+  """Filter allowing retries on file I/O errors."""
+  if isinstance(exception, RuntimeError) or \
+          isinstance(exception, IOError):
+    # GCS I/O raises RuntimeError and local filesystem I/O
+    # raises IOError when file reading is failed.
+    return True
+  else:
+    return False
+
+
+class FileChecksumMatcher(BaseMatcher):
+  """Matcher that verifies file(s) content by comparing file checksum.
+
+  Use fileio to fetch file(s) from given path. Currently, fileio supports
+  local filesystem and GCS.
+
+  File checksum is a SHA-1 hash computed from content of file(s).
+  """
+
+  def __init__(self, file_path, expected_checksum):
+    self.file_path = file_path
+    self.expected_checksum = expected_checksum
+
+  @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
+                                  retry_filter=retry_on_fileio_error)
+  def _read_with_retry(self):
+    """Read path with retry if I/O failed"""
+    source = TextFileSource(self.file_path)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    return read_lines
+
+  def _matches(self, _):
+    # Read from given file(s) path
+    read_lines = self._read_with_retry()
+
+    # Compute checksum
+    read_lines.sort()
+    m = hashlib.new('sha1')
+    for line in read_lines:
+      m.update(line)
+    self.checksum, num_lines = (m.hexdigest(), len(read_lines))
+    logging.info('Read from given path %s, %d lines, checksum: %s.',
+                 self.file_path, num_lines, self.checksum)
+    return self.checksum == self.expected_checksum
+
+  def describe_to(self, description):
+    description \
+      .append_text("Expected checksum is ") \
+      .append_text(self.expected_checksum)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Actual checksum is ") \
+      .append_text(self.checksum)

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 9f70b45..91dedad 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -18,27 +18,37 @@
 """Unit tests for the test pipeline verifiers"""
 
 import logging
+import tempfile
 import unittest
 
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.fileio import TextFileSource
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PipelineResult
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from hamcrest import assert_that as hc_assert_that
+from apache_beam.tests import pipeline_verifiers as verifiers
+from apache_beam.tests.test_utils import patch_retry
 
 
 class PipelineVerifiersTest(unittest.TestCase):
 
+  def setUp(self):
+    self._mock_result = Mock()
+    patch_retry(self, verifiers)
+
   def test_pipeline_state_matcher_success(self):
     """Test PipelineStateMatcher successes when using default expected state
     and job actually finished in DONE
     """
     pipeline_result = PipelineResult(PipelineState.DONE)
-    hc_assert_that(pipeline_result, PipelineStateMatcher())
+    hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
 
   def test_pipeline_state_matcher_given_state(self):
     """Test PipelineStateMatcher successes when matches given state"""
     pipeline_result = PipelineResult(PipelineState.FAILED)
-    hc_assert_that(pipeline_result, PipelineStateMatcher(PipelineState.FAILED))
+    hc_assert_that(pipeline_result,
+                   verifiers.PipelineStateMatcher(PipelineState.FAILED))
 
   def test_pipeline_state_matcher_fails(self):
     """Test PipelineStateMatcher fails when using default expected state
@@ -53,7 +63,52 @@ class PipelineVerifiersTest(unittest.TestCase):
     for state in failed_state:
       pipeline_result = PipelineResult(state)
       with self.assertRaises(AssertionError):
-        hc_assert_that(pipeline_result, PipelineStateMatcher())
+        hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+  test_cases = [
+      {'content': 'Test FileChecksumMatcher with single file',
+       'num_files': 1,
+       'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
+      {'content': 'Test FileChecksumMatcher with multiple files',
+       'num_files': 3,
+       'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
+      {'content': '',
+       'num_files': 1,
+       'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
+  ]
+
+  def create_temp_file(self, content, directory=None):
+    with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
+      f.write(content)
+      return f.name
+
+  def test_file_checksum_matcher_success(self):
+    for case in self.test_cases:
+      temp_dir = tempfile.mkdtemp()
+      for _ in range(case['num_files']):
+        self.create_temp_file(case['content'], temp_dir)
+      matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+                                              case['expected_checksum'])
+      hc_assert_that(self._mock_result, matcher)
+
+  @patch.object(TextFileSource, 'reader')
+  def test_file_checksum_matcher_read_failed(self, mock_reader):
+    mock_reader.side_effect = IOError('No file found.')
+    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+    with self.assertRaises(IOError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_reader.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+
+  @patch.object(TextFileSource, 'reader')
+  def test_file_checksum_matcher_service_error(self, mock_reader):
+    mock_reader.side_effect = RuntimeError('GCS service failed.')
+    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+    with self.assertRaises(RuntimeError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_reader.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
new file mode 100644
index 0000000..fc99fe9
--- /dev/null
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility methods for testing"""
+
+import imp
+from mock import Mock, patch
+
+from apache_beam.utils import retry
+
+
+def patch_retry(cls, module):
+  """A function to patch retry module to use mock clock and logger."""
+  real_retry_with_exponential_backoff = retry.with_exponential_backoff
+
+  def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+    """A patch for retry decorator to use a mock dummy clock and logger."""
+    return real_retry_with_exponential_backoff(
+        num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
+        clock=Mock())
+
+  patch.object(retry, 'with_exponential_backoff',
+               side_effect=patched_retry_with_exponential_backoff).start()
+
+  # Reload module after patching.
+  imp.reload(module)
+
+  def kill_patches():
+    patch.stopall()
+    # Reload module again after removing patch.
+    imp.reload(module)
+
+  cls.addCleanup(kill_patches)


[3/3] beam git commit: Closes #1756

Posted by ro...@apache.org.
Closes #1756


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e23c3cab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e23c3cab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e23c3cab

Branch: refs/heads/python-sdk
Commit: e23c3cab3b9ad261c28fb4049de30d4b05522bfb
Parents: 4ba0b60 c955ad6
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jan 12 12:17:09 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:09 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   | 24 +++++--
 .../apache_beam/io/datastore/v1/helper_test.py  | 31 +-------
 sdks/python/apache_beam/test_pipeline.py        | 67 +++++++++++-------
 sdks/python/apache_beam/test_pipeline_test.py   | 74 ++++++++++++++++----
 .../apache_beam/tests/pipeline_verifiers.py     | 71 ++++++++++++++++++-
 .../tests/pipeline_verifiers_test.py            | 68 ++++++++++++++++--
 sdks/python/apache_beam/tests/test_utils.py     | 57 +++++++++++++++
 7 files changed, 316 insertions(+), 76 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: [BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource

Posted by ro...@apache.org.
[BEAM-1188] Use fileio.ChannelFactory instead of TextFileSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c955ad63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c955ad63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c955ad63

Branch: refs/heads/python-sdk
Commit: c955ad639e04687559dff3279b75a6e75934313d
Parents: feca7cf
Author: Mark Liu <ma...@google.com>
Authored: Wed Jan 11 15:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:17:08 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   |  7 ++--
 sdks/python/apache_beam/test_pipeline.py        |  2 +-
 sdks/python/apache_beam/test_pipeline_test.py   |  7 ++--
 .../apache_beam/tests/pipeline_verifiers.py     | 36 +++++++++++---------
 .../tests/pipeline_verifiers_test.py            | 31 +++++++++--------
 sdks/python/apache_beam/tests/test_utils.py     | 18 +++++++---
 6 files changed, 59 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index bf63688..77926bb 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -20,7 +20,6 @@
 import logging
 import unittest
 
-from datetime import datetime as dt
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
@@ -32,7 +31,9 @@ from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
 
 class WordCountIT(unittest.TestCase):
 
-  DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02'
+  # The default checksum is a SHA-1 hash generated from a sorted list of
+  # lines read from expected output.
+  DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
 
   @attr('IT')
   def test_wordcount_it(self):
@@ -40,7 +41,7 @@ class WordCountIT(unittest.TestCase):
 
     # Set extra options to the pipeline for test purpose
     output = '/'.join([test_pipeline.get_option('output'),
-                       dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'),
+                       test_pipeline.get_option('job_name'),
                        'results'])
     pipeline_verifiers = [PipelineStateMatcher(),
                           FileChecksumMatcher(output + '*-of-*',

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69040d1..69f4ddd 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -117,7 +117,7 @@ class TestPipeline(Pipeline):
 
     Append extra pipeline options to existing option list if provided.
     Test verifier (if contains in extra options) should be pickled before
-    append, and will be unpickled later in TestRunner.
+    appending, and will be unpickled later in the TestRunner.
     """
     options = list(self.options_list)
     for k, v in extra_opts.items():

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 41b935c..526a8a2 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -19,7 +19,6 @@
 
 import logging
 import unittest
-import mock
 
 from hamcrest.core.base_matcher import BaseMatcher
 from hamcrest.core.assert_that import assert_that as hc_assert_that
@@ -76,7 +75,9 @@ class TestPipelineTest(unittest.TestCase):
       {'options': {'student': True},
        'expected': ['--student']},
       {'options': {'student': False},
-       'expected': []}
+       'expected': []},
+      {'options': {'name': 'Mark', 'student': True},
+       'expected': ['--name=Mark', '--student']}
   ]
 
   def test_append_extra_options(self):
@@ -90,7 +91,7 @@ class TestPipelineTest(unittest.TestCase):
     opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
     matcher = pickler.loads(opt_list[0].split('=')[1])
     self.assertTrue(isinstance(matcher, BaseMatcher))
-    hc_assert_that(mock.Mock(), matcher)
+    hc_assert_that(None, matcher)
 
   def test_get_option(self):
     name, value = ('job', 'mockJob')

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index efcfbdf..6bf8d48 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -25,10 +25,12 @@ of test pipeline job. Customized verifier should extend
 import hashlib
 import logging
 
-from apache_beam.io.fileio import TextFileSource
+from apitools.base.py.exceptions import HttpError
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
 from apache_beam.utils import retry
-from hamcrest.core.base_matcher import BaseMatcher
 
 MAX_RETRIES = 4
 
@@ -57,12 +59,10 @@ class PipelineStateMatcher(BaseMatcher):
       .append_text(pipeline_result.current_state())
 
 
-def retry_on_fileio_error(exception):
-  """Filter allowing retries on file I/O errors."""
-  if isinstance(exception, RuntimeError) or \
+def retry_on_io_error_and_server_error(exception):
+  """Filter allowing retries on file I/O errors and service error."""
+  if isinstance(exception, HttpError) or \
           isinstance(exception, IOError):
-    # GCS I/O raises RuntimeError and local filesystem I/O
-    # raises IOError when file reading is failed.
     return True
   else:
     return False
@@ -71,25 +71,27 @@ def retry_on_fileio_error(exception):
 class FileChecksumMatcher(BaseMatcher):
   """Matcher that verifies file(s) content by comparing file checksum.
 
-  Use fileio to fetch file(s) from given path. Currently, fileio supports
-  local filesystem and GCS.
-
-  File checksum is a SHA-1 hash computed from content of file(s).
+  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+  is a SHA-1 hash computed from content of file(s).
   """
 
   def __init__(self, file_path, expected_checksum):
     self.file_path = file_path
     self.expected_checksum = expected_checksum
 
-  @retry.with_exponential_backoff(num_retries=MAX_RETRIES,
-                                  retry_filter=retry_on_fileio_error)
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry_on_io_error_and_server_error)
   def _read_with_retry(self):
     """Read path with retry if I/O failed"""
-    source = TextFileSource(self.file_path)
     read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
+    matched_path = ChannelFactory.glob(self.file_path)
+    if not matched_path:
+      raise IOError('No such file or directory: %s' % self.file_path)
+    for path in matched_path:
+      with ChannelFactory.open(path, 'r') as f:
+        for line in f:
+          read_lines.append(line)
     return read_lines
 
   def _matches(self, _):

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 91dedad..84a4dbe 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -21,10 +21,11 @@ import logging
 import tempfile
 import unittest
 
+from apitools.base.py.exceptions import HttpError
 from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
-from apache_beam.io.fileio import TextFileSource
+from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.tests import pipeline_verifiers as verifiers
@@ -91,23 +92,25 @@ class PipelineVerifiersTest(unittest.TestCase):
                                               case['expected_checksum'])
       hc_assert_that(self._mock_result, matcher)
 
-  @patch.object(TextFileSource, 'reader')
-  def test_file_checksum_matcher_read_failed(self, mock_reader):
-    mock_reader.side_effect = IOError('No file found.')
+  @patch.object(ChannelFactory, 'glob')
+  def test_file_checksum_matcher_read_failed(self, mock_glob):
+    mock_glob.side_effect = IOError('No file found.')
     matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
     with self.assertRaises(IOError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_reader.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
-
-  @patch.object(TextFileSource, 'reader')
-  def test_file_checksum_matcher_service_error(self, mock_reader):
-    mock_reader.side_effect = RuntimeError('GCS service failed.')
-    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
-    with self.assertRaises(RuntimeError):
+    self.assertTrue(mock_glob.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+
+  @patch.object(ChannelFactory, 'glob')
+  def test_file_checksum_matcher_service_error(self, mock_glob):
+    mock_glob.side_effect = HttpError(
+        response={'status': '404'}, url='', content='Not Found',
+    )
+    matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+    with self.assertRaises(HttpError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_reader.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count)
+    self.assertTrue(mock_glob.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/c955ad63/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index fc99fe9..3fdfe88 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -23,8 +23,18 @@ from mock import Mock, patch
 from apache_beam.utils import retry
 
 
-def patch_retry(cls, module):
-  """A function to patch retry module to use mock clock and logger."""
+def patch_retry(testcase, module):
+  """A function to patch retry module to use mock clock and logger.
+
+  Clock and logger that defined in retry decorator will be replaced in test
+  in order to skip sleep phase when retry happens.
+
+  Args:
+    testcase: An instance of unittest.TestCase that calls this function to
+      patch retry module.
+    module: The module that uses retry and need to be replaced with mock
+      clock and logger in test.
+  """
   real_retry_with_exponential_backoff = retry.with_exponential_backoff
 
   def patched_retry_with_exponential_backoff(num_retries, retry_filter):
@@ -39,9 +49,9 @@ def patch_retry(cls, module):
   # Reload module after patching.
   imp.reload(module)
 
-  def kill_patches():
+  def remove_patches():
     patch.stopall()
     # Reload module again after removing patch.
     imp.reload(module)
 
-  cls.addCleanup(kill_patches)
+  testcase.addCleanup(remove_patches)