You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/28 20:17:46 UTC

[beam] branch release-2.13.0 updated: [BEAM-6380] Allow rewinding PipeStream

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch release-2.13.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.13.0 by this push:
     new 610a00b  [BEAM-6380] Allow rewinding PipeStream
     new 33d53bc  Merge pull request #8702 from chamikaramj/cherrypick_pr_8661_2
610a00b is described below

commit 610a00beba0ce16f36693ac9c60523b790760d68
Author: Udi Meiri <eh...@google.com>
AuthorDate: Wed May 22 18:17:16 2019 -0700

    [BEAM-6380] Allow rewinding PipeStream
    
    Can only rewind to the beginning of the last block read.
    
    Also fixes error detection in test thread.
---
 sdks/python/apache_beam/io/filesystemio.py      | 33 ++++++++++++----
 sdks/python/apache_beam/io/filesystemio_test.py | 51 ++++++++++++++++++++-----
 2 files changed, 66 insertions(+), 18 deletions(-)

diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py
index dca341d..a373a00 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -208,16 +208,25 @@ class UploaderStream(io.RawIOBase):
 
 
 class PipeStream(object):
-  """A class that presents a pipe connection as a readable stream."""
+  """A class that presents a pipe connection as a readable stream.
+
+  Not thread-safe.
+
+  Remembers the last ``size`` bytes read and allows rewinding the stream by that
+  amount exactly. See BEAM-6380 for more.
+  """
 
   def __init__(self, recv_pipe):
     self.conn = recv_pipe
     self.closed = False
-    # TODO(BEAM-6380): For debugging.
-    self.last_position = None
     self.position = 0
     self.remaining = b''
 
+    # Data and position of last block streamed. Allows limited seeking backwards
+    # of stream.
+    self.last_block_position = None
+    self.last_block = b''
+
   def read(self, size):
     """Read data from the wrapped pipe connection.
 
@@ -230,11 +239,12 @@ class PipeStream(object):
     """
     data_list = []
     bytes_read = 0
+    self.last_block_position = self.position
+
     while bytes_read < size:
       bytes_from_remaining = min(size - bytes_read, len(self.remaining))
       data_list.append(self.remaining[0:bytes_from_remaining])
       self.remaining = self.remaining[bytes_from_remaining:]
-      self.last_position = self.position
       self.position += bytes_from_remaining
       bytes_read += bytes_from_remaining
       if not self.remaining:
@@ -242,7 +252,8 @@ class PipeStream(object):
           self.remaining = self.conn.recv_bytes()
         except EOFError:
           break
-    return b''.join(data_list)
+    self.last_block = b''.join(data_list)
+    return self.last_block
 
   def tell(self):
     """Tell the file's current offset.
@@ -262,11 +273,17 @@ class PipeStream(object):
     # must have this no-op method here in that case.
     if whence == os.SEEK_END and offset == 0:
       return
-    elif whence == os.SEEK_SET and offset == self.position:
-      return
+    elif whence == os.SEEK_SET:
+      if offset == self.position:
+        return
+      elif offset == self.last_block_position and self.last_block:
+        self.position = offset
+        self.remaining = b''.join([self.last_block, self.remaining])
+        self.last_block = b''
+        return
     raise NotImplementedError(
         'offset: %s, whence: %s, position: %s, last: %s' % (
-            offset, whence, self.position, self.last_position))
+            offset, whence, self.position, self.last_block_position))
 
   def _check_open(self):
     if self.closed:
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index 2177459..72e7f0d 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -150,7 +150,7 @@ class TestUploaderStream(unittest.TestCase):
 
 class TestPipeStream(unittest.TestCase):
 
-  def _read_and_verify(self, stream, expected, buffer_size):
+  def _read_and_verify(self, stream, expected, buffer_size, success):
     data_list = []
     bytes_read = 0
     seen_last_block = False
@@ -169,6 +169,33 @@ class TestPipeStream(unittest.TestCase):
       bytes_read += len(data)
       self.assertEqual(stream.tell(), bytes_read)
     self.assertEqual(b''.join(data_list), expected)
+    success[0] = True
+
+  def _read_and_seek(self, stream, expected, buffer_size, success):
+    data_list = []
+    bytes_read = 0
+    while True:
+      data = stream.read(buffer_size)
+
+      # Test bad seek positions.
+      with self.assertRaises(NotImplementedError):
+        stream.seek(bytes_read + 1)
+      with self.assertRaises(NotImplementedError):
+        stream.seek(bytes_read - 1)
+
+      # Rewind stream and test that it reads back the same data again.
+      stream.seek(bytes_read)
+      data2 = stream.read(buffer_size)
+      self.assertEqual(data, data2)
+
+      if not data:
+        break
+      data_list.append(data)
+      bytes_read += len(data)
+      self.assertEqual(stream.tell(), bytes_read)
+    self.assertEqual(len(b''.join(data_list)), len(expected))
+    self.assertEqual(b''.join(data_list), expected)
+    success[0] = True
 
   def test_pipe_stream(self):
     block_sizes = list(4**i for i in range(0, 12))
@@ -178,15 +205,19 @@ class TestPipeStream(unittest.TestCase):
     buffer_sizes = [100001, 512 * 1024, 1024 * 1024]
 
     for buffer_size in buffer_sizes:
-      parent_conn, child_conn = multiprocessing.Pipe()
-      stream = filesystemio.PipeStream(child_conn)
-      child_thread = threading.Thread(
-          target=self._read_and_verify, args=(stream, expected, buffer_size))
-      child_thread.start()
-      for data in data_blocks:
-        parent_conn.send_bytes(data)
-      parent_conn.close()
-      child_thread.join()
+      for target in [self._read_and_verify, self._read_and_seek]:
+        logging.info('buffer_size=%s, target=%s' % (buffer_size, target))
+        parent_conn, child_conn = multiprocessing.Pipe()
+        stream = filesystemio.PipeStream(child_conn)
+        success = [False]
+        child_thread = threading.Thread(
+            target=target, args=(stream, expected, buffer_size, success))
+        child_thread.start()
+        for data in data_blocks:
+          parent_conn.send_bytes(data)
+        parent_conn.close()
+        child_thread.join()
+        self.assertTrue(success[0], 'error in test thread')
 
 
 if __name__ == '__main__':