You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/06/06 23:59:46 UTC
[beam] branch master updated: [BEAM-6952] fixes concatenated
compressed files bug with python sdk (#8187)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new db82464 [BEAM-6952] fixes concatenated compressed files bug with python sdk (#8187)
db82464 is described below
commit db824645205c1480656b95ab74e4ea11cc9be344
Author: dlesco <da...@cbs.com>
AuthorDate: Thu Jun 6 19:59:33 2019 -0400
[BEAM-6952] fixes concatenated compressed files bug with python sdk (#8187)
---
sdks/python/apache_beam/io/filesystem.py | 29 ++++-------
sdks/python/apache_beam/io/filesystem_test.py | 73 +++++++++++++++++++++++++++
2 files changed, 83 insertions(+), 19 deletions(-)
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index efc745a..4df3e01 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -207,34 +207,25 @@ class CompressedFile(object):
) < num_bytes:
# Continue reading from the underlying file object until enough bytes are
# available, or EOF is reached.
- buf = self._file.read(self._read_size)
+ if not self._decompressor.unused_data:
+ buf = self._file.read(self._read_size)
+ else:
+ # Any uncompressed data at the end of the stream of a gzip or bzip2
+ # file that is not corrupted points to a concatenated compressed
+ # file. We read concatenated files by recursively creating decompressor
+ # objects for the unused compressed data.
+ buf = self._decompressor.unused_data
+ self._initialize_decompressor()
if buf:
decompressed = self._decompressor.decompress(buf)
del buf # Free up some possibly large and no-longer-needed memory.
self._read_buffer.write(decompressed)
else:
# EOF of current stream reached.
- #
- # Any uncompressed data at the end of the stream of a gzip or bzip2
- # file that is not corrupted points to a concatenated compressed
- # file. We read concatenated files by recursively creating decompressor
- # objects for the unused compressed data.
if (self._compression_type == CompressionTypes.BZIP2 or
self._compression_type == CompressionTypes.DEFLATE or
self._compression_type == CompressionTypes.GZIP):
- if self._decompressor.unused_data != b'':
- buf = self._decompressor.unused_data
-
- if self._compression_type == CompressionTypes.BZIP2:
- self._decompressor = bz2.BZ2Decompressor()
- elif self._compression_type == CompressionTypes.DEFLATE:
- self._decompressor = zlib.decompressobj()
- else:
- self._decompressor = zlib.decompressobj(self._gzip_mask)
-
- decompressed = self._decompressor.decompress(buf)
- self._read_buffer.write(decompressed)
- continue
+ pass
else:
# Deflate, Gzip and bzip2 formats do not require flushing
# remaining data in the decompressor into the read buffer when
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
index b26d79d..cb48f37 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -461,6 +461,79 @@ atomized in instants hammered around the
if not line:
break
+ def test_concatenated_compressed_file(self):
+ # The test apache_beam.io.textio_test.test_read_gzip_concat
+ # does not encounter the problem in the Beam 2.11 and earlier
+ # code base because the test data is too small: the data is
+ # smaller than read_size, so it goes through logic in the code
+ # that avoids the problem in the code. So, this test sets
+ # read_size smaller and test data bigger, in order to
+ # encounter the problem. It would be difficult to test in the
+ # textio_test module, because you'd need very large test data
+ # because default read_size is 16MiB, and the ReadFromText
+ # interface does not allow you to modify the read_size.
+ import random
+ import signal
+ from six import int2byte
+ num_test_lines = 10
+ timeout = 30
+ read_size = (64<<10) # set much smaller than the line size
+ byte_table = tuple(int2byte(i) for i in range(32, 96))
+
+ def generate_random_line():
+ byte_list = list(b
+ for i in range(4096)
+ for b in random.sample(byte_table, 64)
+ )
+ byte_list.append(b'\n')
+ return b''.join(byte_list)
+
+ def create_test_file(compression_type, lines):
+ filenames = list()
+ file_name = self._create_temp_file()
+ if compression_type == CompressionTypes.BZIP2:
+ compress_factory = bz2.BZ2File
+ elif compression_type == CompressionTypes.GZIP:
+ compress_factory = gzip.open
+ else:
+ assert False, "Invalid compression type: %s" % compression_type
+ for line in lines:
+ filenames.append(self._create_temp_file())
+ with compress_factory(filenames[-1], 'wb') as f:
+ f.write(line)
+ with open(file_name, 'wb') as o:
+ for name in filenames:
+ with open(name, 'rb') as i:
+ o.write(i.read())
+ return file_name
+
+ # I remember some time ago when a job ran with a real concatenated
+ # gzip file, I got into an endless loop in the beam filesystem module.
+ # That's why I put this handler in to trap an endless loop. However,
+ # this unit test doesn't encounter an endless loop, it encounters a
+ # different error, in the Beam 2.11 and earlier implementation.
+ # So it's not strictly necessary to have this handler in this unit test.
+
+ def alarm_handler(signum, frame):
+ self.fail('Timed out reading compressed file.')
+ raise IOError('Exiting due to likley infinite loop logic in code.')
+
+ old_handler = signal.signal(signal.SIGALRM, alarm_handler)
+ try:
+ test_lines = tuple(generate_random_line() for i in range(num_test_lines))
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = create_test_file(compression_type, test_lines)
+ signal.alarm(timeout)
+ with open(file_name, 'rb') as f:
+ data = CompressedFile(f, compression_type, read_size=read_size)
+ for written_line in test_lines:
+ read_line = data.readline()
+ self.assertEqual(written_line, read_line)
+ signal.alarm(0)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)