You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/06/06 23:59:46 UTC

[beam] branch master updated: [BEAM-6952] fixes concatenated compressed files bug with python sdk (#8187)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db82464  [BEAM-6952] fixes concatenated compressed files bug with python sdk (#8187)
db82464 is described below

commit db824645205c1480656b95ab74e4ea11cc9be344
Author: dlesco <da...@cbs.com>
AuthorDate: Thu Jun 6 19:59:33 2019 -0400

    [BEAM-6952] fixes concatenated compressed files bug with python sdk (#8187)
---
 sdks/python/apache_beam/io/filesystem.py      | 29 ++++-------
 sdks/python/apache_beam/io/filesystem_test.py | 73 +++++++++++++++++++++++++++
 2 files changed, 83 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index efc745a..4df3e01 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -207,34 +207,25 @@ class CompressedFile(object):
                                  ) < num_bytes:
       # Continue reading from the underlying file object until enough bytes are
       # available, or EOF is reached.
-      buf = self._file.read(self._read_size)
+      if not self._decompressor.unused_data:
+        buf = self._file.read(self._read_size)
+      else:
+        # Any uncompressed data at the end of the stream of a gzip or bzip2
+        # file that is not corrupted points to a concatenated compressed
+        # file. We read concatenated files by recursively creating decompressor
+        # objects for the unused compressed data.
+        buf = self._decompressor.unused_data
+        self._initialize_decompressor()
       if buf:
         decompressed = self._decompressor.decompress(buf)
         del buf  # Free up some possibly large and no-longer-needed memory.
         self._read_buffer.write(decompressed)
       else:
         # EOF of current stream reached.
-        #
-        # Any uncompressed data at the end of the stream of a gzip or bzip2
-        # file that is not corrupted points to a concatenated compressed
-        # file. We read concatenated files by recursively creating decompressor
-        # objects for the unused compressed data.
         if (self._compression_type == CompressionTypes.BZIP2 or
             self._compression_type == CompressionTypes.DEFLATE or
             self._compression_type == CompressionTypes.GZIP):
-          if self._decompressor.unused_data != b'':
-            buf = self._decompressor.unused_data
-
-            if self._compression_type == CompressionTypes.BZIP2:
-              self._decompressor = bz2.BZ2Decompressor()
-            elif self._compression_type == CompressionTypes.DEFLATE:
-              self._decompressor = zlib.decompressobj()
-            else:
-              self._decompressor = zlib.decompressobj(self._gzip_mask)
-
-            decompressed = self._decompressor.decompress(buf)
-            self._read_buffer.write(decompressed)
-            continue
+          pass
         else:
           # Deflate, Gzip and bzip2 formats do not require flushing
           # remaining data in the decompressor into the read buffer when
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
index b26d79d..cb48f37 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -461,6 +461,79 @@ atomized in instants hammered around the
         if not line:
           break
 
+  def test_concatenated_compressed_file(self):
+    # The test apache_beam.io.textio_test.test_read_gzip_concat
+    # does not encounter the problem in the Beam 2.11 and earlier
+    # code base because the test data is too small: the data is
+    # smaller than read_size, so it goes through logic in the code
+    # that avoids the problem in the code.  So, this test sets
+    # read_size smaller and test data bigger, in order to
+    # encounter the problem. It would be difficult to test in the
+    # textio_test module, because you'd need very large test data
+    # because default read_size is 16MiB, and the ReadFromText
+    # interface does not allow you to modify the read_size.
+    import random
+    import signal
+    from six import int2byte
+    num_test_lines = 10
+    timeout = 30
+    read_size = (64<<10) # set much smaller than the line size
+    byte_table = tuple(int2byte(i) for i in range(32, 96))
+
+    def generate_random_line():
+      byte_list = list(b
+                       for i in range(4096)
+                       for b in random.sample(byte_table, 64)
+                      )
+      byte_list.append(b'\n')
+      return b''.join(byte_list)
+
+    def create_test_file(compression_type, lines):
+      filenames = list()
+      file_name = self._create_temp_file()
+      if compression_type == CompressionTypes.BZIP2:
+        compress_factory = bz2.BZ2File
+      elif compression_type == CompressionTypes.GZIP:
+        compress_factory = gzip.open
+      else:
+        assert False, "Invalid compression type: %s" % compression_type
+      for line in lines:
+        filenames.append(self._create_temp_file())
+        with compress_factory(filenames[-1], 'wb') as f:
+          f.write(line)
+      with open(file_name, 'wb') as o:
+        for name in filenames:
+          with open(name, 'rb') as i:
+            o.write(i.read())
+      return file_name
+
+    # I remember some time ago when a job ran with a real concatenated
+    # gzip file, I got into an endless loop in the beam filesystem module.
+    # That's why I put this handler in to trap an endless loop. However,
+    # this unit test doesn't encounter an endless loop, it encounters a
+    # different error, in the Beam 2.11 and earlier implementation.
+    # So it's not strictly necessary to have this handler in this unit test.
+
+    def alarm_handler(signum, frame):
+      self.fail('Timed out reading compressed file.')
+      raise IOError('Exiting due to likley infinite loop logic in code.')
+
+    old_handler = signal.signal(signal.SIGALRM, alarm_handler)
+    try:
+      test_lines = tuple(generate_random_line() for i in range(num_test_lines))
+      for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+        file_name = create_test_file(compression_type, test_lines)
+        signal.alarm(timeout)
+        with open(file_name, 'rb') as f:
+          data = CompressedFile(f, compression_type, read_size=read_size)
+          for written_line in test_lines:
+            read_line = data.readline()
+            self.assertEqual(written_line, read_line)
+        signal.alarm(0)
+    finally:
+      signal.alarm(0)
+      signal.signal(signal.SIGALRM, old_handler)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)