You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:26 UTC

[31/50] [abbrv] beam git commit: Update assertions of source_test_utils from camelcase to underscore-separated.

Update assertions of source_test_utils from camelcase to underscore-separated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f66fbdc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f66fbdc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f66fbdc

Branch: refs/heads/jstorm-runner
Commit: 1f66fbdce2187008c9e0ab535f0de3d69146d48b
Parents: 85cfd0c
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Apr 13 18:57:04 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Apr 17 15:01:21 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py       | 14 ++--
 .../python/apache_beam/io/concat_source_test.py | 12 ++--
 sdks/python/apache_beam/io/source_test_utils.py | 72 ++++++++++----------
 .../apache_beam/io/source_test_utils_test.py    | 20 +++---
 sdks/python/apache_beam/io/textio_test.py       | 18 ++---
 .../apache_beam/transforms/create_test.py       | 18 ++---
 6 files changed, 76 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 8b14443..5f2db62 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -142,10 +142,10 @@ class TestAvro(unittest.TestCase):
           (split.source, split.start_position, split.stop_position)
           for split in splits
       ]
-      source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
-                                                          sources_info)
+      source_test_utils.assert_sources_equal_reference_source(
+          (source, None, None), sources_info)
     else:
-      read_records = source_test_utils.readFromSource(source, None, None)
+      read_records = source_test_utils.read_from_source(source, None, None)
       self.assertItemsEqual(expected_result, read_records)
 
   def test_read_without_splitting(self):
@@ -228,7 +228,7 @@ class TestAvro(unittest.TestCase):
   def test_read_reentrant_without_splitting(self):
     file_name = self._write_data()
     source = AvroSource(file_name)
-    source_test_utils.assertReentrantReadsSucceed((source, None, None))
+    source_test_utils.assert_reentrant_reads_succeed((source, None, None))
 
   def test_read_reantrant_with_splitting(self):
     file_name = self._write_data()
@@ -236,7 +236,7 @@ class TestAvro(unittest.TestCase):
     splits = [
         split for split in source.split(desired_bundle_size=100000)]
     assert len(splits) == 1
-    source_test_utils.assertReentrantReadsSucceed(
+    source_test_utils.assert_reentrant_reads_succeed(
         (splits[0].source, splits[0].start_position, splits[0].stop_position))
 
   def test_read_without_splitting_multiple_blocks(self):
@@ -322,7 +322,7 @@ class TestAvro(unittest.TestCase):
       splits = [split
                 for split in source.split(desired_bundle_size=float('inf'))]
       assert len(splits) == 1
-      source_test_utils.assertSplitAtFractionExhaustive(splits[0].source)
+      source_test_utils.assert_split_at_fraction_exhaustive(splits[0].source)
     finally:
       avro.datafile.SYNC_INTERVAL = old_sync_interval
 
@@ -343,7 +343,7 @@ class TestAvro(unittest.TestCase):
 
     source = AvroSource(corrupted_file_name)
     with self.assertRaises(ValueError) as exn:
-      source_test_utils.readFromSource(source, None, None)
+      source_test_utils.read_from_source(source, None, None)
       self.assertEqual(0, exn.exception.message.find('Unexpected sync marker'))
 
   def test_source_transform(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 2cc4684..77d2647 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -83,7 +83,7 @@ class RangeSource(iobase.BoundedSource):
 class ConcatSourceTest(unittest.TestCase):
 
   def test_range_source(self):
-    source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+    source_test_utils.assert_split_at_fraction_exhaustive(RangeSource(0, 10, 3))
 
   def test_conact_source(self):
     source = ConcatSource([RangeSource(0, 4),
@@ -157,7 +157,7 @@ class ConcatSourceTest(unittest.TestCase):
     self.assertEquals(range_tracker.position_at_fraction(1), (3, None))
 
   def test_empty_source(self):
-    read_all = source_test_utils.readFromSource
+    read_all = source_test_utils.read_from_source
 
     empty = RangeSource(0, 0)
     self.assertEquals(read_all(ConcatSource([])), [])
@@ -174,7 +174,7 @@ class ConcatSourceTest(unittest.TestCase):
                       [])
 
   def test_single_source(self):
-    read_all = source_test_utils.readFromSource
+    read_all = source_test_utils.read_from_source
 
     range10 = RangeSource(0, 10)
     self.assertEquals(read_all(ConcatSource([range10])), range(10))
@@ -183,7 +183,7 @@ class ConcatSourceTest(unittest.TestCase):
                       range(5))
 
   def test_source_with_empty_ranges(self):
-    read_all = source_test_utils.readFromSource
+    read_all = source_test_utils.read_from_source
 
     empty = RangeSource(0, 0)
     self.assertEquals(read_all(empty), [])
@@ -206,7 +206,7 @@ class ConcatSourceTest(unittest.TestCase):
                            RangeSource(13, 17),
                            empty,
                           ])
-    source_test_utils.assertSplitAtFractionExhaustive(source)
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
 
   def test_run_concat_direct(self):
     source = ConcatSource([RangeSource(0, 10),
@@ -224,7 +224,7 @@ class ConcatSourceTest(unittest.TestCase):
                            RangeSource(100, 110),
                            RangeSource(1000, 1010),
                           ])
-    source_test_utils.assertSplitAtFractionExhaustive(source)
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 542e9f6..5584fa7 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -67,7 +67,7 @@ SplitFractionStatistics = namedtuple(
     'successful_fractions non_trivial_fractions')
 
 
-def readFromSource(source, start_position=None, stop_position=None):
+def read_from_source(source, start_position=None, stop_position=None):
   """Reads elements from the given ```BoundedSource```.
 
   Only reads elements within the given position range.
@@ -97,7 +97,7 @@ def _ThreadPool(threads):
   return ThreadPool(threads)
 
 
-def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
+def assert_sources_equal_reference_source(reference_source_info, sources_info):
   """Tests if a reference source is equal to a given set of sources.
 
   Given a reference source (a ``BoundedSource`` and a position range) and a
@@ -125,7 +125,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
                      'item of the tuple gives a '
                      'iobase.BoundedSource. Received: %r'
                      , reference_source_info)
-  reference_records = readFromSource(
+  reference_records = read_from_source(
       *reference_source_info)
 
   source_records = []
@@ -147,7 +147,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
           reference_source_info[0], source_info[0],
           type(reference_source_info[0].default_output_coder()),
           type(source_info[0].default_output_coder()))
-    source_records.extend(readFromSource(*source_info))
+    source_records.extend(read_from_source(*source_info))
 
   if len(reference_records) != len(source_records):
     raise ValueError(
@@ -161,7 +161,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
         'same set of records.')
 
 
-def assertReentrantReadsSucceed(source_info):
+def assert_reentrant_reads_succeed(source_info):
   """Tests if a given source can be read in a reentrant manner.
 
   Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
@@ -216,8 +216,8 @@ def assertReentrantReadsSucceed(source_info):
                        i, expected_values, reentrant_read)
 
 
-def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
-                                  split_fraction, expected_outcome):
+def assert_split_at_fraction_behavior(source, num_items_to_read_before_split,
+                                      split_fraction, expected_outcome):
   """Verifies the behaviour of splitting a source at a given fraction.
 
   Asserts that splitting a ``BoundedSource`` either fails after reading
@@ -237,13 +237,13 @@ def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
     source while the second value of the tuple will be '-1'.
   """
   assert isinstance(source, iobase.BoundedSource)
-  expected_items = readFromSource(source, None, None)
-  return _assertSplitAtFractionBehavior(
+  expected_items = read_from_source(source, None, None)
+  return _assert_split_at_fraction_behavior(
       source, expected_items, num_items_to_read_before_split, split_fraction,
       expected_outcome)
 
 
-def _assertSplitAtFractionBehavior(
+def _assert_split_at_fraction_behavior(
     source, expected_items, num_items_to_read_before_split,
     split_fraction, expected_outcome, start_position=None, stop_position=None):
 
@@ -307,7 +307,7 @@ def _assertSplitAtFractionBehavior(
   residual_range = (
       split_result[0], stop_position_before_split) if split_result else None
 
-  return _verifySingleSplitFractionResult(
+  return _verify_single_split_fraction_result(
       source, expected_items, current_items,
       split_result,
       (range_tracker.start_position(), range_tracker.stop_position()),
@@ -318,19 +318,19 @@ def _range_to_str(start, stop):
   return '[' + (str(start) + ',' + str(stop) + ')')
 
 
-def _verifySingleSplitFractionResult(
+def _verify_single_split_fraction_result(
     source, expected_items, current_items, split_successful, primary_range,
     residual_range, split_fraction):
 
   assert primary_range
-  primary_items = readFromSource(source, *primary_range)
+  primary_items = read_from_source(source, *primary_range)
 
   if not split_successful:
     # For unsuccessful splits, residual_range should be None.
     assert not residual_range
 
   residual_items = (
-      readFromSource(source, *residual_range)
+      read_from_source(source, *residual_range)
       if split_successful else [])
 
   total_items = primary_items + residual_items
@@ -359,9 +359,8 @@ def _verifySingleSplitFractionResult(
   return result
 
 
-def assertSplitAtFractionSucceedsAndConsistent(source,
-                                               num_items_to_read_before_split,
-                                               split_fraction):
+def assert_split_at_fraction_succeeds_and_consistent(
+    source, num_items_to_read_before_split, split_fraction):
   """Verifies some consistency properties of dynamic work rebalancing.
 
   Equivalent to the following pseudocode:::
@@ -392,13 +391,13 @@ def assertSplitAtFractionSucceedsAndConsistent(source,
     split_fraction: fraction to split at.
   """
 
-  assertSplitAtFractionBehavior(
+  assert_split_at_fraction_behavior(
       source, num_items_to_read_before_split, split_fraction,
       ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
 
 
-def assertSplitAtFractionFails(source, num_items_to_read_before_split,
-                               split_fraction):
+def assert_split_at_fraction_fails(source, num_items_to_read_before_split,
+                                   split_fraction):
   """Asserts that dynamic work rebalancing at a given fraction fails.
 
   Asserts that trying to perform dynamic splitting after reading
@@ -410,16 +409,15 @@ def assertSplitAtFractionFails(source, num_items_to_read_before_split,
     split_fraction: fraction to split at.
   """
 
-  assertSplitAtFractionBehavior(
+  assert_split_at_fraction_behavior(
       source, num_items_to_read_before_split, split_fraction,
       ExpectedSplitOutcome.MUST_FAIL)
 
 
-def assertSplitAtFractionBinary(source, expected_items,
-                                num_items_to_read_before_split, left_fraction,
-                                left_result,
-                                right_fraction, right_result, stats,
-                                start_position=None, stop_position=None):
+def assert_split_at_fraction_binary(
+    source, expected_items, num_items_to_read_before_split, left_fraction,
+    left_result, right_fraction, right_result, stats, start_position=None,
+    stop_position=None):
   """Performs dynamic work rebalancing for fractions within a given range.
 
   Asserts that given a start position, a source can be split at every
@@ -445,16 +443,16 @@ def assertSplitAtFractionBinary(source, expected_items,
   middle_fraction = (left_fraction + right_fraction) / 2
 
   if left_result is None:
-    left_result = _assertSplitAtFractionBehavior(
+    left_result = _assert_split_at_fraction_behavior(
         source, expected_items, num_items_to_read_before_split, left_fraction,
         ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
 
   if right_result is None:
-    right_result = _assertSplitAtFractionBehavior(
+    right_result = _assert_split_at_fraction_behavior(
         source, expected_items, num_items_to_read_before_split,
         right_fraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
 
-  middle_result = _assertSplitAtFractionBehavior(
+  middle_result = _assert_split_at_fraction_behavior(
       source, expected_items, num_items_to_read_before_split, middle_fraction,
       ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
 
@@ -468,7 +466,7 @@ def assertSplitAtFractionBinary(source, expected_items,
   # enough since the total number of records is constant).
 
   if left_result[0] != middle_result[0]:
-    assertSplitAtFractionBinary(
+    assert_split_at_fraction_binary(
         source, expected_items, num_items_to_read_before_split, left_fraction,
         left_result, middle_fraction, middle_result, stats)
 
@@ -477,7 +475,7 @@ def assertSplitAtFractionBinary(source, expected_items,
   # fraction 1.0, there might be fractions in range ('middle_fraction', 1.0)
   # where dynamic splitting succeeds).
   if right_fraction == 1.0 or middle_result[0] != right_result[0]:
-    assertSplitAtFractionBinary(
+    assert_split_at_fraction_binary(
         source, expected_items, num_items_to_read_before_split,
         middle_fraction, middle_result, right_fraction, right_result, stats)
 
@@ -485,7 +483,7 @@ MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100
 MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000
 
 
-def assertSplitAtFractionExhaustive(
+def assert_split_at_fraction_exhaustive(
     source, start_position=None, stop_position=None,
     perform_multi_threaded_test=True):
   """Performs and tests dynamic work rebalancing exhaustively.
@@ -504,7 +502,7 @@ def assertSplitAtFractionExhaustive(
     ValueError: if the exhaustive splitting test fails.
   """
 
-  expected_items = readFromSource(source, start_position, stop_position)
+  expected_items = read_from_source(source, start_position, stop_position)
   if not expected_items:
     raise ValueError('Source %r is empty.', source)
 
@@ -519,7 +517,7 @@ def assertSplitAtFractionExhaustive(
   for i in range(len(expected_items)):
     stats = SplitFractionStatistics([], [])
 
-    assertSplitAtFractionBinary(
+    assert_split_at_fraction_binary(
         source, expected_items, i, 0.0, None, 1.0, None, stats)
 
     if stats.successful_fractions:
@@ -571,7 +569,7 @@ def assertSplitAtFractionExhaustive(
           )
           break
 
-        if _assertSplitAtFractionConcurrent(
+        if _assert_split_at_fraction_concurrent(
             source, expected_items, i, min_non_trivial_fraction, thread_pool):
           have_success = True
         else:
@@ -595,7 +593,7 @@ def assertSplitAtFractionExhaustive(
                num_total_trials, len(expected_items))
 
 
-def _assertSplitAtFractionConcurrent(
+def _assert_split_at_fraction_concurrent(
     source, expected_items, num_items_to_read_before_splitting,
     split_fraction, thread_pool=None):
 
@@ -634,7 +632,7 @@ def _assertSplitAtFractionConcurrent(
   residual_range = (
       split_result[0], stop_position_before_split) if split_result else None
 
-  res = _verifySingleSplitFractionResult(
+  res = _verify_single_split_fraction_result(
       source, expected_items, current_items, split_result,
       primary_range, residual_range, split_fraction)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/source_test_utils_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py b/sdks/python/apache_beam/io/source_test_utils_test.py
index f6f9ec3..b822fc5 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -48,7 +48,7 @@ class SourceTestUtilsTest(unittest.TestCase):
     data = self._create_data(100)
     source = self._create_source(data)
     self.assertItemsEqual(
-        data, source_test_utils.readFromSource(source, None, None))
+        data, source_test_utils.read_from_source(source, None, None))
 
   def test_source_equals_reference_source(self):
     data = self._create_data(100)
@@ -60,25 +60,25 @@ class SourceTestUtilsTest(unittest.TestCase):
                        'bundles. Please adjust the test so that at least '
                        'two splits get generated.', len(sources_info))
 
-    source_test_utils.assertSourcesEqualReferenceSource(
+    source_test_utils.assert_sources_equal_reference_source(
         (reference_source, None, None), sources_info)
 
   def test_split_at_fraction_successful(self):
     data = self._create_data(100)
     source = self._create_source(data)
-    result1 = source_test_utils.assertSplitAtFractionBehavior(
+    result1 = source_test_utils.assert_split_at_fraction_behavior(
         source, 10, 0.5,
         source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
-    result2 = source_test_utils.assertSplitAtFractionBehavior(
+    result2 = source_test_utils.assert_split_at_fraction_behavior(
         source, 20, 0.5,
         source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
     self.assertEquals(result1, result2)
     self.assertEquals(100, result1[0] + result1[1])
 
-    result3 = source_test_utils.assertSplitAtFractionBehavior(
+    result3 = source_test_utils.assert_split_at_fraction_behavior(
         source, 30, 0.8,
         source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
-    result4 = source_test_utils.assertSplitAtFractionBehavior(
+    result4 = source_test_utils.assert_split_at_fraction_behavior(
         source, 50, 0.8,
         source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
     self.assertEquals(result3, result4)
@@ -91,13 +91,13 @@ class SourceTestUtilsTest(unittest.TestCase):
     data = self._create_data(100)
     source = self._create_source(data)
 
-    result = source_test_utils.assertSplitAtFractionBehavior(
+    result = source_test_utils.assert_split_at_fraction_behavior(
         source, 90, 0.1, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
     self.assertEquals(result[0], 100)
     self.assertEquals(result[1], -1)
 
     with self.assertRaises(ValueError):
-      source_test_utils.assertSplitAtFractionBehavior(
+      source_test_utils.assert_split_at_fraction_behavior(
           source, 10, 0.5, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
 
   def test_split_at_fraction_binary(self):
@@ -105,7 +105,7 @@ class SourceTestUtilsTest(unittest.TestCase):
     source = self._create_source(data)
 
     stats = source_test_utils.SplitFractionStatistics([], [])
-    source_test_utils.assertSplitAtFractionBinary(
+    source_test_utils.assert_split_at_fraction_binary(
         source, data, 10, 0.5, None, 0.8, None, stats)
 
     # These lists should not be empty now.
@@ -115,7 +115,7 @@ class SourceTestUtilsTest(unittest.TestCase):
   def test_split_at_fraction_exhaustive(self):
     data = self._create_data(10)
     source = self._create_source(data)
-    source_test_utils.assertSplitAtFractionExhaustive(source)
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index b3f4391..90dc665 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -255,7 +255,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     sources_info = ([
         (split.source, split.start_position, split.stop_position) for
         split in splits])
-    source_test_utils.assertSourcesEqualReferenceSource(
+    source_test_utils.assert_sources_equal_reference_source(
         reference_source_info, sources_info)
 
   def test_progress(self):
@@ -291,7 +291,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     assert len(expected_data) == 10
     source = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
                         coders.StrUtf8Coder())
-    source_test_utils.assertReentrantReadsSucceed((source, None, None))
+    source_test_utils.assert_reentrant_reads_succeed((source, None, None))
 
   def test_read_reentrant_after_splitting(self):
     file_name, expected_data = write_data(10)
@@ -300,7 +300,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
                         coders.StrUtf8Coder())
     splits = [split for split in source.split(desired_bundle_size=100000)]
     assert len(splits) == 1
-    source_test_utils.assertReentrantReadsSucceed(
+    source_test_utils.assert_reentrant_reads_succeed(
         (splits[0].source, splits[0].start_position, splits[0].stop_position))
 
   def test_dynamic_work_rebalancing(self):
@@ -310,7 +310,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
                         coders.StrUtf8Coder())
     splits = [split for split in source.split(desired_bundle_size=100000)]
     assert len(splits) == 1
-    source_test_utils.assertSplitAtFractionExhaustive(
+    source_test_utils.assert_split_at_fraction_exhaustive(
         splits[0].source, splits[0].start_position, splits[0].stop_position)
 
   def test_dynamic_work_rebalancing_windows_eol(self):
@@ -320,7 +320,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
                         coders.StrUtf8Coder())
     splits = [split for split in source.split(desired_bundle_size=100000)]
     assert len(splits) == 1
-    source_test_utils.assertSplitAtFractionExhaustive(
+    source_test_utils.assert_split_at_fraction_exhaustive(
         splits[0].source, splits[0].start_position, splits[0].stop_position,
         perform_multi_threaded_test=False)
 
@@ -331,7 +331,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
                         coders.StrUtf8Coder())
     splits = [split for split in source.split(desired_bundle_size=100000)]
     assert len(splits) == 1
-    source_test_utils.assertSplitAtFractionExhaustive(
+    source_test_utils.assert_split_at_fraction_exhaustive(
         splits[0].source, splits[0].start_position, splits[0].stop_position,
         perform_multi_threaded_test=False)
 
@@ -449,7 +449,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     sources_info = ([
         (split.source, split.start_position, split.stop_position) for
         split in splits])
-    source_test_utils.assertSourcesEqualReferenceSource(
+    source_test_utils.assert_sources_equal_reference_source(
         reference_source_info, sources_info)
 
   def test_read_gzip_empty_file(self):
@@ -561,10 +561,10 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
         (split.source, split.start_position, split.stop_position) for
         split in splits])
     self.assertGreater(len(sources_info), 1)
-    reference_lines = source_test_utils.readFromSource(*reference_source_info)
+    reference_lines = source_test_utils.read_from_source(*reference_source_info)
     split_lines = []
     for source_info in sources_info:
-      split_lines.extend(source_test_utils.readFromSource(*source_info))
+      split_lines.extend(source_test_utils.read_from_source(*source_info))
 
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)

http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index f4b1f07..2352acd 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -41,7 +41,7 @@ class CreateTest(unittest.TestCase):
 
   def check_read(self, values, coder):
     source = Create._create_source_from_iterable(values, coder)
-    read_values = source_test_utils.readFromSource(source)
+    read_values = source_test_utils.read_from_source(source)
     self.assertEqual(sorted(values), sorted(read_values))
 
   def test_create_source_read_with_initial_splits(self):
@@ -73,27 +73,27 @@ class CreateTest(unittest.TestCase):
     splits_info = [
         (split.source, split.start_position, split.stop_position)
         for split in splits]
-    source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
-                                                        splits_info)
+    source_test_utils.assert_sources_equal_reference_source(
+        (source, None, None), splits_info)
 
   def test_create_source_read_reentrant(self):
     source = Create._create_source_from_iterable(range(9), self.coder)
-    source_test_utils.assertReentrantReadsSucceed((source, None, None))
+    source_test_utils.assert_reentrant_reads_succeed((source, None, None))
 
   def test_create_source_read_reentrant_with_initial_splits(self):
     source = Create._create_source_from_iterable(range(24), self.coder)
     for split in source.split(desired_bundle_size=5):
-      source_test_utils.assertReentrantReadsSucceed((split.source,
-                                                     split.start_position,
-                                                     split.stop_position))
+      source_test_utils.assert_reentrant_reads_succeed((split.source,
+                                                        split.start_position,
+                                                        split.stop_position))
 
   def test_create_source_dynamic_splitting(self):
     # 2 values
     source = Create._create_source_from_iterable(range(2), self.coder)
-    source_test_utils.assertSplitAtFractionExhaustive(source)
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
     # Multiple values.
     source = Create._create_source_from_iterable(range(11), self.coder)
-    source_test_utils.assertSplitAtFractionExhaustive(
+    source_test_utils.assert_split_at_fraction_exhaustive(
         source, perform_multi_threaded_test=True)
 
   def test_create_source_progress(self):