You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:26 UTC
[31/50] [abbrv] beam git commit: Update assertions of
source_test_utils from camelcase to underscore-separated.
Update assertions of source_test_utils from camelcase to underscore-separated.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f66fbdc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f66fbdc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f66fbdc
Branch: refs/heads/jstorm-runner
Commit: 1f66fbdce2187008c9e0ab535f0de3d69146d48b
Parents: 85cfd0c
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Apr 13 18:57:04 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Apr 17 15:01:21 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio_test.py | 14 ++--
.../python/apache_beam/io/concat_source_test.py | 12 ++--
sdks/python/apache_beam/io/source_test_utils.py | 72 ++++++++++----------
.../apache_beam/io/source_test_utils_test.py | 20 +++---
sdks/python/apache_beam/io/textio_test.py | 18 ++---
.../apache_beam/transforms/create_test.py | 18 ++---
6 files changed, 76 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 8b14443..5f2db62 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -142,10 +142,10 @@ class TestAvro(unittest.TestCase):
(split.source, split.start_position, split.stop_position)
for split in splits
]
- source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
- sources_info)
+ source_test_utils.assert_sources_equal_reference_source(
+ (source, None, None), sources_info)
else:
- read_records = source_test_utils.readFromSource(source, None, None)
+ read_records = source_test_utils.read_from_source(source, None, None)
self.assertItemsEqual(expected_result, read_records)
def test_read_without_splitting(self):
@@ -228,7 +228,7 @@ class TestAvro(unittest.TestCase):
def test_read_reentrant_without_splitting(self):
file_name = self._write_data()
source = AvroSource(file_name)
- source_test_utils.assertReentrantReadsSucceed((source, None, None))
+ source_test_utils.assert_reentrant_reads_succeed((source, None, None))
def test_read_reantrant_with_splitting(self):
file_name = self._write_data()
@@ -236,7 +236,7 @@ class TestAvro(unittest.TestCase):
splits = [
split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
- source_test_utils.assertReentrantReadsSucceed(
+ source_test_utils.assert_reentrant_reads_succeed(
(splits[0].source, splits[0].start_position, splits[0].stop_position))
def test_read_without_splitting_multiple_blocks(self):
@@ -322,7 +322,7 @@ class TestAvro(unittest.TestCase):
splits = [split
for split in source.split(desired_bundle_size=float('inf'))]
assert len(splits) == 1
- source_test_utils.assertSplitAtFractionExhaustive(splits[0].source)
+ source_test_utils.assert_split_at_fraction_exhaustive(splits[0].source)
finally:
avro.datafile.SYNC_INTERVAL = old_sync_interval
@@ -343,7 +343,7 @@ class TestAvro(unittest.TestCase):
source = AvroSource(corrupted_file_name)
with self.assertRaises(ValueError) as exn:
- source_test_utils.readFromSource(source, None, None)
+ source_test_utils.read_from_source(source, None, None)
self.assertEqual(0, exn.exception.message.find('Unexpected sync marker'))
def test_source_transform(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 2cc4684..77d2647 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -83,7 +83,7 @@ class RangeSource(iobase.BoundedSource):
class ConcatSourceTest(unittest.TestCase):
def test_range_source(self):
- source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+ source_test_utils.assert_split_at_fraction_exhaustive(RangeSource(0, 10, 3))
def test_conact_source(self):
source = ConcatSource([RangeSource(0, 4),
@@ -157,7 +157,7 @@ class ConcatSourceTest(unittest.TestCase):
self.assertEquals(range_tracker.position_at_fraction(1), (3, None))
def test_empty_source(self):
- read_all = source_test_utils.readFromSource
+ read_all = source_test_utils.read_from_source
empty = RangeSource(0, 0)
self.assertEquals(read_all(ConcatSource([])), [])
@@ -174,7 +174,7 @@ class ConcatSourceTest(unittest.TestCase):
[])
def test_single_source(self):
- read_all = source_test_utils.readFromSource
+ read_all = source_test_utils.read_from_source
range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([range10])), range(10))
@@ -183,7 +183,7 @@ class ConcatSourceTest(unittest.TestCase):
range(5))
def test_source_with_empty_ranges(self):
- read_all = source_test_utils.readFromSource
+ read_all = source_test_utils.read_from_source
empty = RangeSource(0, 0)
self.assertEquals(read_all(empty), [])
@@ -206,7 +206,7 @@ class ConcatSourceTest(unittest.TestCase):
RangeSource(13, 17),
empty,
])
- source_test_utils.assertSplitAtFractionExhaustive(source)
+ source_test_utils.assert_split_at_fraction_exhaustive(source)
def test_run_concat_direct(self):
source = ConcatSource([RangeSource(0, 10),
@@ -224,7 +224,7 @@ class ConcatSourceTest(unittest.TestCase):
RangeSource(100, 110),
RangeSource(1000, 1010),
])
- source_test_utils.assertSplitAtFractionExhaustive(source)
+ source_test_utils.assert_split_at_fraction_exhaustive(source)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 542e9f6..5584fa7 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -67,7 +67,7 @@ SplitFractionStatistics = namedtuple(
'successful_fractions non_trivial_fractions')
-def readFromSource(source, start_position=None, stop_position=None):
+def read_from_source(source, start_position=None, stop_position=None):
"""Reads elements from the given ```BoundedSource```.
Only reads elements within the given position range.
@@ -97,7 +97,7 @@ def _ThreadPool(threads):
return ThreadPool(threads)
-def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
+def assert_sources_equal_reference_source(reference_source_info, sources_info):
"""Tests if a reference source is equal to a given set of sources.
Given a reference source (a ``BoundedSource`` and a position range) and a
@@ -125,7 +125,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
'item of the tuple gives a '
'iobase.BoundedSource. Received: %r'
, reference_source_info)
- reference_records = readFromSource(
+ reference_records = read_from_source(
*reference_source_info)
source_records = []
@@ -147,7 +147,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
reference_source_info[0], source_info[0],
type(reference_source_info[0].default_output_coder()),
type(source_info[0].default_output_coder()))
- source_records.extend(readFromSource(*source_info))
+ source_records.extend(read_from_source(*source_info))
if len(reference_records) != len(source_records):
raise ValueError(
@@ -161,7 +161,7 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
'same set of records.')
-def assertReentrantReadsSucceed(source_info):
+def assert_reentrant_reads_succeed(source_info):
"""Tests if a given source can be read in a reentrant manner.
Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
@@ -216,8 +216,8 @@ def assertReentrantReadsSucceed(source_info):
i, expected_values, reentrant_read)
-def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
- split_fraction, expected_outcome):
+def assert_split_at_fraction_behavior(source, num_items_to_read_before_split,
+ split_fraction, expected_outcome):
"""Verifies the behaviour of splitting a source at a given fraction.
Asserts that splitting a ``BoundedSource`` either fails after reading
@@ -237,13 +237,13 @@ def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
source while the second value of the tuple will be '-1'.
"""
assert isinstance(source, iobase.BoundedSource)
- expected_items = readFromSource(source, None, None)
- return _assertSplitAtFractionBehavior(
+ expected_items = read_from_source(source, None, None)
+ return _assert_split_at_fraction_behavior(
source, expected_items, num_items_to_read_before_split, split_fraction,
expected_outcome)
-def _assertSplitAtFractionBehavior(
+def _assert_split_at_fraction_behavior(
source, expected_items, num_items_to_read_before_split,
split_fraction, expected_outcome, start_position=None, stop_position=None):
@@ -307,7 +307,7 @@ def _assertSplitAtFractionBehavior(
residual_range = (
split_result[0], stop_position_before_split) if split_result else None
- return _verifySingleSplitFractionResult(
+ return _verify_single_split_fraction_result(
source, expected_items, current_items,
split_result,
(range_tracker.start_position(), range_tracker.stop_position()),
@@ -318,19 +318,19 @@ def _range_to_str(start, stop):
return '[' + (str(start) + ',' + str(stop) + ')')
-def _verifySingleSplitFractionResult(
+def _verify_single_split_fraction_result(
source, expected_items, current_items, split_successful, primary_range,
residual_range, split_fraction):
assert primary_range
- primary_items = readFromSource(source, *primary_range)
+ primary_items = read_from_source(source, *primary_range)
if not split_successful:
# For unsuccessful splits, residual_range should be None.
assert not residual_range
residual_items = (
- readFromSource(source, *residual_range)
+ read_from_source(source, *residual_range)
if split_successful else [])
total_items = primary_items + residual_items
@@ -359,9 +359,8 @@ def _verifySingleSplitFractionResult(
return result
-def assertSplitAtFractionSucceedsAndConsistent(source,
- num_items_to_read_before_split,
- split_fraction):
+def assert_split_at_fraction_succeeds_and_consistent(
+ source, num_items_to_read_before_split, split_fraction):
"""Verifies some consistency properties of dynamic work rebalancing.
Equivalent to the following pseudocode:::
@@ -392,13 +391,13 @@ def assertSplitAtFractionSucceedsAndConsistent(source,
split_fraction: fraction to split at.
"""
- assertSplitAtFractionBehavior(
+ assert_split_at_fraction_behavior(
source, num_items_to_read_before_split, split_fraction,
ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
-def assertSplitAtFractionFails(source, num_items_to_read_before_split,
- split_fraction):
+def assert_split_at_fraction_fails(source, num_items_to_read_before_split,
+ split_fraction):
"""Asserts that dynamic work rebalancing at a given fraction fails.
Asserts that trying to perform dynamic splitting after reading
@@ -410,16 +409,15 @@ def assertSplitAtFractionFails(source, num_items_to_read_before_split,
split_fraction: fraction to split at.
"""
- assertSplitAtFractionBehavior(
+ assert_split_at_fraction_behavior(
source, num_items_to_read_before_split, split_fraction,
ExpectedSplitOutcome.MUST_FAIL)
-def assertSplitAtFractionBinary(source, expected_items,
- num_items_to_read_before_split, left_fraction,
- left_result,
- right_fraction, right_result, stats,
- start_position=None, stop_position=None):
+def assert_split_at_fraction_binary(
+ source, expected_items, num_items_to_read_before_split, left_fraction,
+ left_result, right_fraction, right_result, stats, start_position=None,
+ stop_position=None):
"""Performs dynamic work rebalancing for fractions within a given range.
Asserts that given a start position, a source can be split at every
@@ -445,16 +443,16 @@ def assertSplitAtFractionBinary(source, expected_items,
middle_fraction = (left_fraction + right_fraction) / 2
if left_result is None:
- left_result = _assertSplitAtFractionBehavior(
+ left_result = _assert_split_at_fraction_behavior(
source, expected_items, num_items_to_read_before_split, left_fraction,
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
if right_result is None:
- right_result = _assertSplitAtFractionBehavior(
+ right_result = _assert_split_at_fraction_behavior(
source, expected_items, num_items_to_read_before_split,
right_fraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
- middle_result = _assertSplitAtFractionBehavior(
+ middle_result = _assert_split_at_fraction_behavior(
source, expected_items, num_items_to_read_before_split, middle_fraction,
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS)
@@ -468,7 +466,7 @@ def assertSplitAtFractionBinary(source, expected_items,
# enough since the total number of records is constant).
if left_result[0] != middle_result[0]:
- assertSplitAtFractionBinary(
+ assert_split_at_fraction_binary(
source, expected_items, num_items_to_read_before_split, left_fraction,
left_result, middle_fraction, middle_result, stats)
@@ -477,7 +475,7 @@ def assertSplitAtFractionBinary(source, expected_items,
# fraction 1.0, there might be fractions in range ('middle_fraction', 1.0)
# where dynamic splitting succeeds).
if right_fraction == 1.0 or middle_result[0] != right_result[0]:
- assertSplitAtFractionBinary(
+ assert_split_at_fraction_binary(
source, expected_items, num_items_to_read_before_split,
middle_fraction, middle_result, right_fraction, right_result, stats)
@@ -485,7 +483,7 @@ MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100
MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000
-def assertSplitAtFractionExhaustive(
+def assert_split_at_fraction_exhaustive(
source, start_position=None, stop_position=None,
perform_multi_threaded_test=True):
"""Performs and tests dynamic work rebalancing exhaustively.
@@ -504,7 +502,7 @@ def assertSplitAtFractionExhaustive(
ValueError: if the exhaustive splitting test fails.
"""
- expected_items = readFromSource(source, start_position, stop_position)
+ expected_items = read_from_source(source, start_position, stop_position)
if not expected_items:
raise ValueError('Source %r is empty.', source)
@@ -519,7 +517,7 @@ def assertSplitAtFractionExhaustive(
for i in range(len(expected_items)):
stats = SplitFractionStatistics([], [])
- assertSplitAtFractionBinary(
+ assert_split_at_fraction_binary(
source, expected_items, i, 0.0, None, 1.0, None, stats)
if stats.successful_fractions:
@@ -571,7 +569,7 @@ def assertSplitAtFractionExhaustive(
)
break
- if _assertSplitAtFractionConcurrent(
+ if _assert_split_at_fraction_concurrent(
source, expected_items, i, min_non_trivial_fraction, thread_pool):
have_success = True
else:
@@ -595,7 +593,7 @@ def assertSplitAtFractionExhaustive(
num_total_trials, len(expected_items))
-def _assertSplitAtFractionConcurrent(
+def _assert_split_at_fraction_concurrent(
source, expected_items, num_items_to_read_before_splitting,
split_fraction, thread_pool=None):
@@ -634,7 +632,7 @@ def _assertSplitAtFractionConcurrent(
residual_range = (
split_result[0], stop_position_before_split) if split_result else None
- res = _verifySingleSplitFractionResult(
+ res = _verify_single_split_fraction_result(
source, expected_items, current_items, split_result,
primary_range, residual_range, split_fraction)
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/source_test_utils_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py b/sdks/python/apache_beam/io/source_test_utils_test.py
index f6f9ec3..b822fc5 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -48,7 +48,7 @@ class SourceTestUtilsTest(unittest.TestCase):
data = self._create_data(100)
source = self._create_source(data)
self.assertItemsEqual(
- data, source_test_utils.readFromSource(source, None, None))
+ data, source_test_utils.read_from_source(source, None, None))
def test_source_equals_reference_source(self):
data = self._create_data(100)
@@ -60,25 +60,25 @@ class SourceTestUtilsTest(unittest.TestCase):
'bundles. Please adjust the test so that at least '
'two splits get generated.', len(sources_info))
- source_test_utils.assertSourcesEqualReferenceSource(
+ source_test_utils.assert_sources_equal_reference_source(
(reference_source, None, None), sources_info)
def test_split_at_fraction_successful(self):
data = self._create_data(100)
source = self._create_source(data)
- result1 = source_test_utils.assertSplitAtFractionBehavior(
+ result1 = source_test_utils.assert_split_at_fraction_behavior(
source, 10, 0.5,
source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
- result2 = source_test_utils.assertSplitAtFractionBehavior(
+ result2 = source_test_utils.assert_split_at_fraction_behavior(
source, 20, 0.5,
source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
self.assertEquals(result1, result2)
self.assertEquals(100, result1[0] + result1[1])
- result3 = source_test_utils.assertSplitAtFractionBehavior(
+ result3 = source_test_utils.assert_split_at_fraction_behavior(
source, 30, 0.8,
source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
- result4 = source_test_utils.assertSplitAtFractionBehavior(
+ result4 = source_test_utils.assert_split_at_fraction_behavior(
source, 50, 0.8,
source_test_utils.ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT)
self.assertEquals(result3, result4)
@@ -91,13 +91,13 @@ class SourceTestUtilsTest(unittest.TestCase):
data = self._create_data(100)
source = self._create_source(data)
- result = source_test_utils.assertSplitAtFractionBehavior(
+ result = source_test_utils.assert_split_at_fraction_behavior(
source, 90, 0.1, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
self.assertEquals(result[0], 100)
self.assertEquals(result[1], -1)
with self.assertRaises(ValueError):
- source_test_utils.assertSplitAtFractionBehavior(
+ source_test_utils.assert_split_at_fraction_behavior(
source, 10, 0.5, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
def test_split_at_fraction_binary(self):
@@ -105,7 +105,7 @@ class SourceTestUtilsTest(unittest.TestCase):
source = self._create_source(data)
stats = source_test_utils.SplitFractionStatistics([], [])
- source_test_utils.assertSplitAtFractionBinary(
+ source_test_utils.assert_split_at_fraction_binary(
source, data, 10, 0.5, None, 0.8, None, stats)
# These lists should not be empty now.
@@ -115,7 +115,7 @@ class SourceTestUtilsTest(unittest.TestCase):
def test_split_at_fraction_exhaustive(self):
data = self._create_data(10)
source = self._create_source(data)
- source_test_utils.assertSplitAtFractionExhaustive(source)
+ source_test_utils.assert_split_at_fraction_exhaustive(source)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index b3f4391..90dc665 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -255,7 +255,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
sources_info = ([
(split.source, split.start_position, split.stop_position) for
split in splits])
- source_test_utils.assertSourcesEqualReferenceSource(
+ source_test_utils.assert_sources_equal_reference_source(
reference_source_info, sources_info)
def test_progress(self):
@@ -291,7 +291,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
assert len(expected_data) == 10
source = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
coders.StrUtf8Coder())
- source_test_utils.assertReentrantReadsSucceed((source, None, None))
+ source_test_utils.assert_reentrant_reads_succeed((source, None, None))
def test_read_reentrant_after_splitting(self):
file_name, expected_data = write_data(10)
@@ -300,7 +300,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
coders.StrUtf8Coder())
splits = [split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
- source_test_utils.assertReentrantReadsSucceed(
+ source_test_utils.assert_reentrant_reads_succeed(
(splits[0].source, splits[0].start_position, splits[0].stop_position))
def test_dynamic_work_rebalancing(self):
@@ -310,7 +310,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
coders.StrUtf8Coder())
splits = [split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
- source_test_utils.assertSplitAtFractionExhaustive(
+ source_test_utils.assert_split_at_fraction_exhaustive(
splits[0].source, splits[0].start_position, splits[0].stop_position)
def test_dynamic_work_rebalancing_windows_eol(self):
@@ -320,7 +320,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
coders.StrUtf8Coder())
splits = [split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
- source_test_utils.assertSplitAtFractionExhaustive(
+ source_test_utils.assert_split_at_fraction_exhaustive(
splits[0].source, splits[0].start_position, splits[0].stop_position,
perform_multi_threaded_test=False)
@@ -331,7 +331,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
coders.StrUtf8Coder())
splits = [split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
- source_test_utils.assertSplitAtFractionExhaustive(
+ source_test_utils.assert_split_at_fraction_exhaustive(
splits[0].source, splits[0].start_position, splits[0].stop_position,
perform_multi_threaded_test=False)
@@ -449,7 +449,7 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
sources_info = ([
(split.source, split.start_position, split.stop_position) for
split in splits])
- source_test_utils.assertSourcesEqualReferenceSource(
+ source_test_utils.assert_sources_equal_reference_source(
reference_source_info, sources_info)
def test_read_gzip_empty_file(self):
@@ -561,10 +561,10 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
(split.source, split.start_position, split.stop_position) for
split in splits])
self.assertGreater(len(sources_info), 1)
- reference_lines = source_test_utils.readFromSource(*reference_source_info)
+ reference_lines = source_test_utils.read_from_source(*reference_source_info)
split_lines = []
for source_info in sources_info:
- split_lines.extend(source_test_utils.readFromSource(*source_info))
+ split_lines.extend(source_test_utils.read_from_source(*source_info))
self.assertEqual(expected_data[2:], reference_lines)
self.assertEqual(reference_lines, split_lines)
http://git-wip-us.apache.org/repos/asf/beam/blob/1f66fbdc/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index f4b1f07..2352acd 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -41,7 +41,7 @@ class CreateTest(unittest.TestCase):
def check_read(self, values, coder):
source = Create._create_source_from_iterable(values, coder)
- read_values = source_test_utils.readFromSource(source)
+ read_values = source_test_utils.read_from_source(source)
self.assertEqual(sorted(values), sorted(read_values))
def test_create_source_read_with_initial_splits(self):
@@ -73,27 +73,27 @@ class CreateTest(unittest.TestCase):
splits_info = [
(split.source, split.start_position, split.stop_position)
for split in splits]
- source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
- splits_info)
+ source_test_utils.assert_sources_equal_reference_source(
+ (source, None, None), splits_info)
def test_create_source_read_reentrant(self):
source = Create._create_source_from_iterable(range(9), self.coder)
- source_test_utils.assertReentrantReadsSucceed((source, None, None))
+ source_test_utils.assert_reentrant_reads_succeed((source, None, None))
def test_create_source_read_reentrant_with_initial_splits(self):
source = Create._create_source_from_iterable(range(24), self.coder)
for split in source.split(desired_bundle_size=5):
- source_test_utils.assertReentrantReadsSucceed((split.source,
- split.start_position,
- split.stop_position))
+ source_test_utils.assert_reentrant_reads_succeed((split.source,
+ split.start_position,
+ split.stop_position))
def test_create_source_dynamic_splitting(self):
# 2 values
source = Create._create_source_from_iterable(range(2), self.coder)
- source_test_utils.assertSplitAtFractionExhaustive(source)
+ source_test_utils.assert_split_at_fraction_exhaustive(source)
# Multiple values.
source = Create._create_source_from_iterable(range(11), self.coder)
- source_test_utils.assertSplitAtFractionExhaustive(
+ source_test_utils.assert_split_at_fraction_exhaustive(
source, perform_multi_threaded_test=True)
def test_create_source_progress(self):