You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 19:10:25 UTC

[1/3] incubator-beam git commit: Document that source objects should not be mutated.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk fe1f39609 -> 6e6d89d48


Document that source objects should not be mutated.

Updates  textio._TextSource so that it does not get mutated while reading.

Updates source_test_utils so that sources objects do not get cloned while testing. This could help to catch sources that erroneously get modified while reading.

Adds reentracy tests for text and Avro sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ab8d62a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ab8d62a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ab8d62a

Branch: refs/heads/python-sdk
Commit: 2ab8d62ac48481a52fa04c704491f3a5889de27c
Parents: fe1f396
Author: Chamikara Jayalath <ch...@google.com>
Authored: Wed Oct 12 16:51:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:08:41 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py       | 28 ++++++-
 sdks/python/apache_beam/io/iobase.py            |  8 ++
 sdks/python/apache_beam/io/source_test_utils.py | 13 +---
 sdks/python/apache_beam/io/textio.py            | 80 +++++++++++---------
 sdks/python/apache_beam/io/textio_test.py       | 37 +++++++++
 5 files changed, 121 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab8d62a/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 1c96d72..eb2c81c 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -110,7 +110,7 @@ class TestAvro(unittest.TestCase):
     return file_name_prefix + os.path.sep + 'mytemp*'
 
   def _run_avro_test(self, pattern, desired_bundle_size, perform_splitting,
-                     expected_result):
+                     expected_result, test_reentrancy=False):
     source = AvroSource(pattern)
 
     read_records = []
@@ -128,9 +128,23 @@ class TestAvro(unittest.TestCase):
           (split.source, split.start_position, split.stop_position)
           for split in splits
       ]
+      if test_reentrancy:
+        for source_info in sources_info:
+          reader_iter = source_info[0].read(source_info[0].get_range_tracker(
+              source_info[1], source_info[2]))
+          try:
+            next(reader_iter)
+          except StopIteration:
+            # Ignoring empty bundle
+            pass
+
       source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
                                                           sources_info)
     else:
+      if test_reentrancy:
+        reader_iter = source.read(source.get_range_tracker(None, None))
+        next(reader_iter)
+
       read_records = source_test_utils.readFromSource(source, None, None)
       self.assertItemsEqual(expected_result, read_records)
 
@@ -144,6 +158,18 @@ class TestAvro(unittest.TestCase):
     expected_result = self.RECORDS
     self._run_avro_test(file_name, 100, True, expected_result)
 
+  def test_read_reentrant_without_splitting(self):
+    file_name = self._write_data()
+    expected_result = self.RECORDS
+    self._run_avro_test(file_name, None, False, expected_result,
+                        test_reentrancy=True)
+
+  def test_read_reantrant_with_splitting(self):
+    file_name = self._write_data()
+    expected_result = self.RECORDS
+    self._run_avro_test(file_name, 100, True, expected_result,
+                        test_reentrancy=True)
+
   def test_read_without_splitting_multiple_blocks(self):
     file_name = self._write_data(count=12000)
     expected_result = self.RECORDS * 2000

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab8d62a/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 8239e26..edd3524 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -91,6 +91,14 @@ class BoundedSource(object):
       positions passed to the method ``get_range_tracker()`` are ``None``
   (2) Method read() will be invoked with the ``RangeTracker`` obtained in the
       previous step.
+
+  **Mutability**
+
+  A ``BoundedSource`` object should be fully mutated before being submitted
+  for reading. A ``BoundedSource`` object should not be mutated while
+  its methods (for example, ``read()``) are being invoked by a runner. Runner
+  implementations may invoke methods of ``BoundedSource`` objects through
+  multi-threaded and/or re-entrant execution modes.
   """
 
   def estimate_size(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab8d62a/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 13b1e91..33ab083 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -48,7 +48,6 @@ from collections import namedtuple
 import logging
 
 from multiprocessing.pool import ThreadPool
-from apache_beam.internal import pickler
 from apache_beam.io import iobase
 
 
@@ -81,7 +80,7 @@ def readFromSource(source, start_position=None, stop_position=None):
   values = []
   range_tracker = source.get_range_tracker(start_position, stop_position)
   assert isinstance(range_tracker, iobase.RangeTracker)
-  reader = _copy_source(source).read(range_tracker)
+  reader = source.read(range_tracker)
   for value in reader:
     values.append(value)
 
@@ -173,7 +172,7 @@ def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
     source while the second value of the tuple will be '-1'.
   """
   assert isinstance(source, iobase.BoundedSource)
-  expected_items = readFromSource(_copy_source(source), None, None)
+  expected_items = readFromSource(source, None, None)
   return _assertSplitAtFractionBehavior(
       source, expected_items, num_items_to_read_before_split, split_fraction,
       expected_outcome)
@@ -186,7 +185,7 @@ def _assertSplitAtFractionBehavior(
   range_tracker = source.get_range_tracker(start_position, stop_position)
   assert isinstance(range_tracker, iobase.RangeTracker)
   current_items = []
-  reader = _copy_source(source).read(range_tracker)
+  reader = source.read(range_tracker)
   # Reading 'num_items_to_read_before_split' items.
   reader_iter = iter(reader)
   for _ in range(num_items_to_read_before_split):
@@ -536,7 +535,7 @@ def _assertSplitAtFractionConcurrent(
 
   range_tracker = source.get_range_tracker(None, None)
   stop_position_before_split = range_tracker.stop_position()
-  reader = _copy_source(source).read(range_tracker)
+  reader = source.read(range_tracker)
   reader_iter = iter(reader)
 
   current_items = []
@@ -575,7 +574,3 @@ def _assertSplitAtFractionConcurrent(
       primary_range, residual_range, split_fraction)
 
   return res[1] > 0
-
-
-def _copy_source(source):
-  return pickler.loads(pickler.dumps(source))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab8d62a/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index f1f5a25..dcaceef 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -41,14 +41,20 @@ class _TextSource(filebasedsource.FileBasedSource):
 
   DEFAULT_READ_BUFFER_SIZE = 8192
 
+  class ReadBuffer(object):
+    # A buffer that gives the buffered data and next position in the
+    # buffer that should be read.
+
+    def __init__(self, data, position):
+      self.data = data
+      self.position = position
+
   def __init__(self, file_pattern, min_bundle_size,
                compression_type, strip_trailing_newlines, coder,
                buffer_size=DEFAULT_READ_BUFFER_SIZE):
     super(_TextSource, self).__init__(file_pattern, min_bundle_size,
                                       compression_type=compression_type)
-    self._buffer = ''
-    self._next_position_in_buffer = 0
-    self._file = None
+
     self._strip_trailing_newlines = strip_trailing_newlines
     self._compression_type = compression_type
     self._coder = coder
@@ -57,7 +63,9 @@ class _TextSource(filebasedsource.FileBasedSource):
   def read_records(self, file_name, range_tracker):
     start_offset = range_tracker.start_position()
 
-    self._file = self.open_file(file_name)
+    read_buffer = _TextSource.ReadBuffer('', 0)
+    file_to_read = self.open_file(file_name)
+
     try:
       if start_offset > 0:
         # Seeking to one position before the start index and ignoring the
@@ -65,98 +73,100 @@ class _TextSource(filebasedsource.FileBasedSource):
         # belongs to the current bundle, hence ignoring that is incorrect.
         # Seeking to one byte before prevents that.
 
-        self._file.seek(start_offset - 1)
-        sep_bounds = self._find_separator_bounds()
+        file_to_read.seek(start_offset - 1)
+        sep_bounds = self._find_separator_bounds(file_to_read, read_buffer)
         if not sep_bounds:
           # Could not find a separator after (start_offset - 1). This means that
           # none of the records within the file belongs to the current source.
           return
 
         _, sep_end = sep_bounds
-        self._buffer = self._buffer[sep_end:]
+        read_buffer.data = read_buffer.data[sep_end:]
         next_record_start_position = start_offset -1 + sep_end
       else:
         next_record_start_position = 0
 
       while range_tracker.try_claim(next_record_start_position):
-        record, num_bytes_to_next_record = self._read_record()
+        record, num_bytes_to_next_record = self._read_record(file_to_read,
+                                                             read_buffer)
         yield self._coder.decode(record)
         if num_bytes_to_next_record < 0:
           break
         next_record_start_position += num_bytes_to_next_record
     finally:
-      self._file.close()
+      file_to_read.close()
 
-  def _find_separator_bounds(self):
-    # Determines the start and end positions within 'self._buffer' of the next
-    # separator starting from 'self._next_position_in_buffer'.
+  def _find_separator_bounds(self, file_to_read, read_buffer):
+    # Determines the start and end positions within 'read_buffer.data' of the
+    # next separator starting from position 'read_buffer.position'.
     # Currently supports following separators.
     # * '\n'
     # * '\r\n'
     # This method may increase the size of buffer but it will not decrease the
     # size of it.
 
-    current_pos = self._next_position_in_buffer
+    current_pos = read_buffer.position
 
     while True:
-      if current_pos >= len(self._buffer):
+      if current_pos >= len(read_buffer.data):
         # Ensuring that there are enough bytes to determine if there is a '\n'
         # at current_pos.
-        if not self._try_to_ensure_num_bytes_in_buffer(current_pos + 1):
+        if not self._try_to_ensure_num_bytes_in_buffer(
+            file_to_read, read_buffer, current_pos + 1):
           return
 
       # Using find() here is more efficient than a linear scan of the byte
       # array.
-      next_lf = self._buffer.find('\n', current_pos)
+      next_lf = read_buffer.data.find('\n', current_pos)
       if next_lf >= 0:
-        if self._buffer[next_lf - 1] == '\r':
+        if read_buffer.data[next_lf - 1] == '\r':
           return (next_lf - 1, next_lf + 1)
         else:
           return (next_lf, next_lf + 1)
 
-      current_pos = len(self._buffer)
+      current_pos = len(read_buffer.data)
 
-  def _try_to_ensure_num_bytes_in_buffer(self, num_bytes):
+  def _try_to_ensure_num_bytes_in_buffer(
+      self, file_to_read, read_buffer, num_bytes):
     # Tries to ensure that there are at least num_bytes bytes in the buffer.
     # Returns True if this can be fulfilled, returned False if this cannot be
     # fulfilled due to reaching EOF.
-    while len(self._buffer) < num_bytes:
-      read_data = self._file.read(self._buffer_size)
+    while len(read_buffer.data) < num_bytes:
+      read_data = file_to_read.read(self._buffer_size)
       if not read_data:
         return False
 
-      self._buffer += read_data
+      read_buffer.data += read_data
 
     return True
 
-  def _read_record(self):
+  def _read_record(self, file_to_read, read_buffer):
     # Returns a tuple containing the current_record and number of bytes to the
     # next record starting from 'self._next_position_in_buffer'. If EOF is
     # reached, returns a tuple containing the current record and -1.
 
-    if self._next_position_in_buffer > self._buffer_size:
-      # Buffer is too large. Truncating it and adjusting
-      # self._next_position_in_buffer.
-      self._buffer = self._buffer[self._next_position_in_buffer:]
-      self._next_position_in_buffer = 0
+    if read_buffer.position > self._buffer_size:
+      # read_buffer is too large. Truncating and adjusting it.
+      read_buffer.data = read_buffer.data[read_buffer.position:]
+      read_buffer.position = 0
 
-    record_start_position_in_buffer = self._next_position_in_buffer
-    sep_bounds = self._find_separator_bounds()
-    self._next_position_in_buffer = sep_bounds[1] if sep_bounds else len(
-        self._buffer)
+    record_start_position_in_buffer = read_buffer.position
+    sep_bounds = self._find_separator_bounds(file_to_read, read_buffer)
+    read_buffer.position = sep_bounds[1] if sep_bounds else len(
+        read_buffer.data)
 
     if not sep_bounds:
       # Reached EOF. Bytes up to the EOF is the next record. Returning '-1' for
       # the starting position of the next record.
-      return (self._buffer[record_start_position_in_buffer:], -1)
+      return (read_buffer.data[record_start_position_in_buffer:], -1)
 
     if self._strip_trailing_newlines:
       # Current record should not contain the separator.
-      return (self._buffer[record_start_position_in_buffer:sep_bounds[0]],
+      return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
               sep_bounds[1] - record_start_position_in_buffer)
     else:
       # Current record should contain the separator.
-      return (self._buffer[record_start_position_in_buffer:sep_bounds[1]],
+      return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
               sep_bounds[1] - record_start_position_in_buffer)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab8d62a/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 109506a..90ff3cc 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -198,6 +198,43 @@ class TextSourceTest(unittest.TestCase):
     self.assertEqual(
         [float(i) / 10 for i in range(0, 10)], fraction_consumed_report)
 
+  def test_read_reentrant_without_splitting(self):
+    file_name, expected_data = write_data(10)
+    assert len(expected_data) == 10
+    source1 = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
+                         coders.StrUtf8Coder())
+    reader_iter = source1.read(source1.get_range_tracker(None, None))
+    next(reader_iter)
+    next(reader_iter)
+
+    source2 = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
+                         coders.StrUtf8Coder())
+    source_test_utils.assertSourcesEqualReferenceSource((source1, None, None),
+                                                        [(source2, None, None)])
+
+  def test_read_reentrant_after_splitting(self):
+    file_name, expected_data = write_data(10)
+    assert len(expected_data) == 10
+    source = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
+                        coders.StrUtf8Coder())
+    splits1 = [split for split in source.split(desired_bundle_size=100000)]
+    assert len(splits1) == 1
+    reader_iter = splits1[0].source.read(
+        splits1[0].source.get_range_tracker(
+            splits1[0].start_position, splits1[0].stop_position))
+    next(reader_iter)
+    next(reader_iter)
+
+    splits2 = [split for split in source.split(desired_bundle_size=100000)]
+    assert len(splits2) == 1
+    source_test_utils.assertSourcesEqualReferenceSource(
+        (splits1[0].source,
+         splits1[0].start_position,
+         splits1[0].stop_position),
+        [(splits2[0].source,
+          splits2[0].start_position,
+          splits2[0].stop_position)])
+
   def test_dynamic_work_rebalancing(self):
     file_name, expected_data = write_data(15)
     assert len(expected_data) == 15


[3/3] incubator-beam git commit: Closes #1090

Posted by ro...@apache.org.
Closes #1090


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6e6d89d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6e6d89d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6e6d89d4

Branch: refs/heads/python-sdk
Commit: 6e6d89d48fe7a6bca574e1637bee6fa001c13db0
Parents: fe1f396 bdcb04c
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 18 12:09:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:09:20 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py       |  14 +++
 sdks/python/apache_beam/io/iobase.py            |   7 ++
 sdks/python/apache_beam/io/source_test_utils.py |  68 ++++++++++--
 sdks/python/apache_beam/io/textio.py            | 103 ++++++++++++-------
 sdks/python/apache_beam/io/textio_test.py       |  17 +++
 5 files changed, 165 insertions(+), 44 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Adds an assertion to source_test_utils for testing reentrancy.

Posted by ro...@apache.org.
Adds an assertion to source_test_utils for testing reentrancy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bdcb04cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bdcb04cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bdcb04cb

Branch: refs/heads/python-sdk
Commit: bdcb04cb9146d035339f02559127a810166721ab
Parents: 2ab8d62
Author: Chamikara Jayalath <ch...@google.com>
Authored: Sat Oct 15 17:49:13 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:09:19 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py       | 30 ++++-------
 sdks/python/apache_beam/io/iobase.py            |  5 +-
 sdks/python/apache_beam/io/source_test_utils.py | 55 ++++++++++++++++++++
 sdks/python/apache_beam/io/textio.py            | 29 +++++++++--
 sdks/python/apache_beam/io/textio_test.py       | 34 +++---------
 5 files changed, 99 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdcb04cb/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index eb2c81c..f72c3f3 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -110,7 +110,7 @@ class TestAvro(unittest.TestCase):
     return file_name_prefix + os.path.sep + 'mytemp*'
 
   def _run_avro_test(self, pattern, desired_bundle_size, perform_splitting,
-                     expected_result, test_reentrancy=False):
+                     expected_result):
     source = AvroSource(pattern)
 
     read_records = []
@@ -128,23 +128,9 @@ class TestAvro(unittest.TestCase):
           (split.source, split.start_position, split.stop_position)
           for split in splits
       ]
-      if test_reentrancy:
-        for source_info in sources_info:
-          reader_iter = source_info[0].read(source_info[0].get_range_tracker(
-              source_info[1], source_info[2]))
-          try:
-            next(reader_iter)
-          except StopIteration:
-            # Ignoring empty bundle
-            pass
-
       source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
                                                           sources_info)
     else:
-      if test_reentrancy:
-        reader_iter = source.read(source.get_range_tracker(None, None))
-        next(reader_iter)
-
       read_records = source_test_utils.readFromSource(source, None, None)
       self.assertItemsEqual(expected_result, read_records)
 
@@ -160,15 +146,17 @@ class TestAvro(unittest.TestCase):
 
   def test_read_reentrant_without_splitting(self):
     file_name = self._write_data()
-    expected_result = self.RECORDS
-    self._run_avro_test(file_name, None, False, expected_result,
-                        test_reentrancy=True)
+    source = AvroSource(file_name)
+    source_test_utils.assertReentrantReadsSucceed((source, None, None))
 
   def test_read_reantrant_with_splitting(self):
     file_name = self._write_data()
-    expected_result = self.RECORDS
-    self._run_avro_test(file_name, 100, True, expected_result,
-                        test_reentrancy=True)
+    source = AvroSource(file_name)
+    splits = [
+        split for split in source.split(desired_bundle_size=100000)]
+    assert len(splits) == 1
+    source_test_utils.assertReentrantReadsSucceed(
+        (splits[0].source, splits[0].start_position, splits[0].stop_position))
 
   def test_read_without_splitting_multiple_blocks(self):
     file_name = self._write_data(count=12000)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdcb04cb/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index edd3524..9701964 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -94,11 +94,10 @@ class BoundedSource(object):
 
   **Mutability**
 
-  A ``BoundedSource`` object should be fully mutated before being submitted
-  for reading. A ``BoundedSource`` object should not be mutated while
+  A ``BoundedSource`` object should not be mutated while
   its methods (for example, ``read()``) are being invoked by a runner. Runner
   implementations may invoke methods of ``BoundedSource`` objects through
-  multi-threaded and/or re-entrant execution modes.
+  multi-threaded and/or reentrant execution modes.
   """
 
   def estimate_size(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdcb04cb/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 33ab083..480a95d 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -151,6 +151,61 @@ def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
         'same set of records.')
 
 
+def assertReentrantReadsSucceed(source_info):
+  """Tests if a given source can be read in a reentrant manner.
+
+  Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
+  i in range [1, n-1] this method performs a reentrant read after reading i
+  elements and verifies that both the original and reentrant read produce the
+  expected set of values.
+
+  Args:
+    source_info: a three-tuple that gives the reference
+                 ``iobase.BoundedSource``, position to start reading at, and a
+                 position to stop reading at.
+  Raises:
+    ValueError: if source is too trivial or reentrant read result in an
+                incorrect read.
+  """
+
+  source, start_position, stop_position = source_info
+  assert isinstance(source, iobase.BoundedSource)
+
+  expected_values = [val for val in source.read(source.get_range_tracker(
+      start_position, stop_position))]
+  if len(expected_values) < 2:
+    raise ValueError('Source is too trivial since it produces only %d '
+                     'values. Please give a source that reads at least 2 '
+                     'values.', len(expected_values))
+
+  for i in range(1, len(expected_values) - 1):
+    read_iter = source.read(source.get_range_tracker(
+        start_position, stop_position))
+    original_read = []
+    for _ in range(i):
+      original_read.append(next(read_iter))
+
+    # Reentrant read
+    reentrant_read = [val for val in source.read(
+        source.get_range_tracker(start_position, stop_position))]
+
+    # Continuing original read.
+    for val in read_iter:
+      original_read.append(val)
+
+    if sorted(original_read) != sorted(expected_values):
+      raise ValueError('Source did not produce expected values when '
+                       'performing a reentrant read after reading %d values. '
+                       'Expected %r received %r.',
+                       i, expected_values, original_read)
+
+    if sorted(reentrant_read) != sorted(expected_values):
+      raise ValueError('A reentrant read of source after reading %d values '
+                       'did not produce expected values. Expected %r '
+                       'received %r.',
+                       i, expected_values, reentrant_read)
+
+
 def assertSplitAtFractionBehavior(source, num_items_to_read_before_split,
                                   split_fraction, expected_outcome):
   """Verifies the behaviour of splitting a source at a given fraction.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdcb04cb/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index dcaceef..01f6ef6 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -46,8 +46,29 @@ class _TextSource(filebasedsource.FileBasedSource):
     # buffer that should be read.
 
     def __init__(self, data, position):
-      self.data = data
-      self.position = position
+      self._data = data
+      self._position = position
+
+    @property
+    def data(self):
+      return self._data
+
+    @data.setter
+    def data(self, value):
+      assert isinstance(value, bytes)
+      self._data = value
+
+    @property
+    def position(self):
+      return self._position
+
+    @position.setter
+    def position(self, value):
+      assert isinstance(value, (int, long))
+      if value > len(self._data):
+        raise ValueError('Cannot set position to %d since it\'s larger than '
+                         'size of data %d.', value, len(self._data))
+      self._position = value
 
   def __init__(self, file_pattern, min_bundle_size,
                compression_type, strip_trailing_newlines, coder,
@@ -119,9 +140,11 @@ class _TextSource(filebasedsource.FileBasedSource):
       # array.
       next_lf = read_buffer.data.find('\n', current_pos)
       if next_lf >= 0:
-        if read_buffer.data[next_lf - 1] == '\r':
+        if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
+          # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
+          # Found a '\n'. Accepting that as the next separator.
           return (next_lf, next_lf + 1)
 
       current_pos = len(read_buffer.data)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdcb04cb/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 90ff3cc..81d04ab 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -201,39 +201,19 @@ class TextSourceTest(unittest.TestCase):
   def test_read_reentrant_without_splitting(self):
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
-    source1 = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
-                         coders.StrUtf8Coder())
-    reader_iter = source1.read(source1.get_range_tracker(None, None))
-    next(reader_iter)
-    next(reader_iter)
-
-    source2 = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
-                         coders.StrUtf8Coder())
-    source_test_utils.assertSourcesEqualReferenceSource((source1, None, None),
-                                                        [(source2, None, None)])
+    source = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
+                        coders.StrUtf8Coder())
+    source_test_utils.assertReentrantReadsSucceed((source, None, None))
 
   def test_read_reentrant_after_splitting(self):
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
     source = TextSource(file_name, 0, CompressionTypes.UNCOMPRESSED, True,
                         coders.StrUtf8Coder())
-    splits1 = [split for split in source.split(desired_bundle_size=100000)]
-    assert len(splits1) == 1
-    reader_iter = splits1[0].source.read(
-        splits1[0].source.get_range_tracker(
-            splits1[0].start_position, splits1[0].stop_position))
-    next(reader_iter)
-    next(reader_iter)
-
-    splits2 = [split for split in source.split(desired_bundle_size=100000)]
-    assert len(splits2) == 1
-    source_test_utils.assertSourcesEqualReferenceSource(
-        (splits1[0].source,
-         splits1[0].start_position,
-         splits1[0].stop_position),
-        [(splits2[0].source,
-          splits2[0].start_position,
-          splits2[0].stop_position)])
+    splits = [split for split in source.split(desired_bundle_size=100000)]
+    assert len(splits) == 1
+    source_test_utils.assertReentrantReadsSucceed(
+        (splits[0].source, splits[0].start_position, splits[0].stop_position))
 
   def test_dynamic_work_rebalancing(self):
     file_name, expected_data = write_data(15)