You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/28 20:17:46 UTC
[beam] branch release-2.13.0 updated: [BEAM-6380] Allow rewinding
PipeStream
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch release-2.13.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.13.0 by this push:
new 610a00b [BEAM-6380] Allow rewinding PipeStream
new 33d53bc Merge pull request #8702 from chamikaramj/cherrypick_pr_8661_2
610a00b is described below
commit 610a00beba0ce16f36693ac9c60523b790760d68
Author: Udi Meiri <eh...@google.com>
AuthorDate: Wed May 22 18:17:16 2019 -0700
[BEAM-6380] Allow rewinding PipeStream
Can only rewind to the beginning of the last block read.
Also fixes error detection in test thread.
---
sdks/python/apache_beam/io/filesystemio.py | 33 ++++++++++++----
sdks/python/apache_beam/io/filesystemio_test.py | 51 ++++++++++++++++++++-----
2 files changed, 66 insertions(+), 18 deletions(-)
diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py
index dca341d..a373a00 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -208,16 +208,25 @@ class UploaderStream(io.RawIOBase):
class PipeStream(object):
- """A class that presents a pipe connection as a readable stream."""
+ """A class that presents a pipe connection as a readable stream.
+
+ Not thread-safe.
+
+ Remembers the last ``size`` bytes read and allows rewinding the stream by that
+ amount exactly. See BEAM-6380 for more.
+ """
def __init__(self, recv_pipe):
self.conn = recv_pipe
self.closed = False
- # TODO(BEAM-6380): For debugging.
- self.last_position = None
self.position = 0
self.remaining = b''
+ # Data and position of last block streamed. Allows limited seeking backwards
+ # of stream.
+ self.last_block_position = None
+ self.last_block = b''
+
def read(self, size):
"""Read data from the wrapped pipe connection.
@@ -230,11 +239,12 @@ class PipeStream(object):
"""
data_list = []
bytes_read = 0
+ self.last_block_position = self.position
+
while bytes_read < size:
bytes_from_remaining = min(size - bytes_read, len(self.remaining))
data_list.append(self.remaining[0:bytes_from_remaining])
self.remaining = self.remaining[bytes_from_remaining:]
- self.last_position = self.position
self.position += bytes_from_remaining
bytes_read += bytes_from_remaining
if not self.remaining:
@@ -242,7 +252,8 @@ class PipeStream(object):
self.remaining = self.conn.recv_bytes()
except EOFError:
break
- return b''.join(data_list)
+ self.last_block = b''.join(data_list)
+ return self.last_block
def tell(self):
"""Tell the file's current offset.
@@ -262,11 +273,17 @@ class PipeStream(object):
# must have this no-op method here in that case.
if whence == os.SEEK_END and offset == 0:
return
- elif whence == os.SEEK_SET and offset == self.position:
- return
+ elif whence == os.SEEK_SET:
+ if offset == self.position:
+ return
+ elif offset == self.last_block_position and self.last_block:
+ self.position = offset
+ self.remaining = b''.join([self.last_block, self.remaining])
+ self.last_block = b''
+ return
raise NotImplementedError(
'offset: %s, whence: %s, position: %s, last: %s' % (
- offset, whence, self.position, self.last_position))
+ offset, whence, self.position, self.last_block_position))
def _check_open(self):
if self.closed:
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index 2177459..72e7f0d 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -150,7 +150,7 @@ class TestUploaderStream(unittest.TestCase):
class TestPipeStream(unittest.TestCase):
- def _read_and_verify(self, stream, expected, buffer_size):
+ def _read_and_verify(self, stream, expected, buffer_size, success):
data_list = []
bytes_read = 0
seen_last_block = False
@@ -169,6 +169,33 @@ class TestPipeStream(unittest.TestCase):
bytes_read += len(data)
self.assertEqual(stream.tell(), bytes_read)
self.assertEqual(b''.join(data_list), expected)
+ success[0] = True
+
+ def _read_and_seek(self, stream, expected, buffer_size, success):
+ data_list = []
+ bytes_read = 0
+ while True:
+ data = stream.read(buffer_size)
+
+ # Test bad seek positions.
+ with self.assertRaises(NotImplementedError):
+ stream.seek(bytes_read + 1)
+ with self.assertRaises(NotImplementedError):
+ stream.seek(bytes_read - 1)
+
+ # Rewind stream and test that it reads back the same data again.
+ stream.seek(bytes_read)
+ data2 = stream.read(buffer_size)
+ self.assertEqual(data, data2)
+
+ if not data:
+ break
+ data_list.append(data)
+ bytes_read += len(data)
+ self.assertEqual(stream.tell(), bytes_read)
+ self.assertEqual(len(b''.join(data_list)), len(expected))
+ self.assertEqual(b''.join(data_list), expected)
+ success[0] = True
def test_pipe_stream(self):
block_sizes = list(4**i for i in range(0, 12))
@@ -178,15 +205,19 @@ class TestPipeStream(unittest.TestCase):
buffer_sizes = [100001, 512 * 1024, 1024 * 1024]
for buffer_size in buffer_sizes:
- parent_conn, child_conn = multiprocessing.Pipe()
- stream = filesystemio.PipeStream(child_conn)
- child_thread = threading.Thread(
- target=self._read_and_verify, args=(stream, expected, buffer_size))
- child_thread.start()
- for data in data_blocks:
- parent_conn.send_bytes(data)
- parent_conn.close()
- child_thread.join()
+ for target in [self._read_and_verify, self._read_and_seek]:
+ logging.info('buffer_size=%s, target=%s' % (buffer_size, target))
+ parent_conn, child_conn = multiprocessing.Pipe()
+ stream = filesystemio.PipeStream(child_conn)
+ success = [False]
+ child_thread = threading.Thread(
+ target=target, args=(stream, expected, buffer_size, success))
+ child_thread.start()
+ for data in data_blocks:
+ parent_conn.send_bytes(data)
+ parent_conn.close()
+ child_thread.join()
+ self.assertTrue(success[0], 'error in test thread')
if __name__ == '__main__':