You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/08/03 18:05:45 UTC
[1/2] beam git commit: Adds support for reading concatenated bzip2
files.
Repository: beam
Updated Branches:
refs/heads/master 55bb423d8 -> d4f9e9268
Adds support for reading concatenated bzip2 files.
Adds tests for concatenated gzip and bzip2 files.
Removes test 'test_model_textio_gzip_concatenated' in 'snippets_test.py' since it's actually hitting 'DummyReadTransform' and not testing this feature.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d462439
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d462439
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d462439
Branch: refs/heads/master
Commit: 5d46243992948ab6d4c9436e353989b49186354b
Parents: 55bb423
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Aug 2 22:49:33 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Aug 3 11:05:06 2017 -0700
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 16 ---
sdks/python/apache_beam/io/filesystem.py | 31 +++--
sdks/python/apache_beam/io/textio_test.py | 115 +++++++++++++++++++
3 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 31f71b3..9183d0d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase):
snippets.model_textio_compressed(
{'read': gzip_file_name}, ['aa', 'bb', 'cc'])
- def test_model_textio_gzip_concatenated(self):
- temp_path_1 = self.create_temp_file('a\nb\nc\n')
- temp_path_2 = self.create_temp_file('p\nq\nr\n')
- temp_path_3 = self.create_temp_file('x\ny\nz')
- gzip_file_name = temp_path_1 + '.gz'
- with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst:
- dst.writelines(src)
- with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst:
- dst.writelines(src)
- with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst:
- dst.writelines(src)
- # Add the temporary gzip file to be cleaned up as well.
- self.temp_files.append(gzip_file_name)
- snippets.model_textio_compressed(
- {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'])
-
@unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
def test_model_datastoreio(self):
# We cannot test datastoreio functionality in unit tests therefore we limit
http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 1f65d0a..ef3040c 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -187,29 +187,26 @@ class CompressedFile(object):
del buf # Free up some possibly large and no-longer-needed memory.
self._read_buffer.write(decompressed)
else:
- # EOF reached.
- # Verify completeness and no corruption and flush (if needed by
- # the underlying algorithm).
- if self._compression_type == CompressionTypes.BZIP2:
- # Having unused_data past end of stream would imply file corruption.
- assert not self._decompressor.unused_data, 'Possible file corruption.'
- try:
- # EOF implies that the underlying BZIP2 stream must also have
- # reached EOF. We expect this to raise an EOFError and we catch it
- # below. Any other kind of error though would be problematic.
- self._decompressor.decompress('dummy')
- assert False, 'Possible file corruption.'
- except EOFError:
- pass # All is as expected!
- elif self._compression_type == CompressionTypes.GZIP:
- # If Gzip file check if there is unused data generated by gzip concat
+ # EOF of current stream reached.
+ #
+ # Any uncompressed data at the end of the stream of a gzip or bzip2
+ # file that is not corrupted points to a concatenated compressed
+ # file. We read concatenated files by recursively creating decompressor
+ # objects for the unused compressed data.
+ if (self._compression_type == CompressionTypes.BZIP2 or
+ self._compression_type == CompressionTypes.GZIP):
if self._decompressor.unused_data != '':
buf = self._decompressor.unused_data
- self._decompressor = zlib.decompressobj(self._gzip_mask)
+ self._decompressor = (
+ bz2.BZ2Decompressor()
+ if self._compression_type == CompressionTypes.BZIP2
+ else zlib.decompressobj(self._gzip_mask))
decompressed = self._decompressor.decompress(buf)
self._read_buffer.write(decompressed)
continue
else:
+ # Gzip and bzip2 formats do not require flushing remaining data in the
+ # decompressor into the read buffer when fully decompressing files.
self._read_buffer.write(self._decompressor.flush())
# Record that we have hit the end of file, so we won't unnecessarily
http://git-wip-us.apache.org/repos/asf/beam/blob/5d462439/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 9a4ec47..8bd7116 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -401,6 +401,64 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
assert_that(pcoll, equal_to(lines))
pipeline.run()
+ def test_read_corrupted_bzip2_fails(self):
+ _, lines = write_data(15)
+ file_name = self._create_temp_file()
+ with bz2.BZ2File(file_name, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ with open(file_name, 'wb') as f:
+ f.write('corrupt')
+
+ pipeline = TestPipeline()
+ pcoll = pipeline | 'Read' >> ReadFromText(
+ file_name,
+ compression_type=CompressionTypes.BZIP2)
+ assert_that(pcoll, equal_to(lines))
+ with self.assertRaises(Exception):
+ pipeline.run()
+
+ def test_read_bzip2_concat(self):
+ bzip2_file_name1 = self._create_temp_file()
+ lines = ['a', 'b', 'c']
+ with bz2.BZ2File(bzip2_file_name1, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ bzip2_file_name2 = self._create_temp_file()
+ lines = ['p', 'q', 'r']
+ with bz2.BZ2File(bzip2_file_name2, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ bzip2_file_name3 = self._create_temp_file()
+ lines = ['x', 'y', 'z']
+ with bz2.BZ2File(bzip2_file_name3, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ final_bzip2_file = self._create_temp_file()
+ with open(bzip2_file_name1, 'rb') as src, open(
+ final_bzip2_file, 'wb') as dst:
+ dst.writelines(src.readlines())
+
+ with open(bzip2_file_name2, 'rb') as src, open(
+ final_bzip2_file, 'ab') as dst:
+ dst.writelines(src.readlines())
+
+ with open(bzip2_file_name3, 'rb') as src, open(
+ final_bzip2_file, 'ab') as dst:
+ dst.writelines(src.readlines())
+
+ pipeline = TestPipeline()
+ lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
+ final_bzip2_file,
+ compression_type=beam.io.filesystem.CompressionTypes.BZIP2)
+
+ expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']
+ assert_that(lines, equal_to(expected))
+ pipeline.run()
+
def test_read_gzip(self):
_, lines = write_data(15)
file_name = self._create_temp_file()
@@ -415,6 +473,63 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
assert_that(pcoll, equal_to(lines))
pipeline.run()
+ def test_read_corrupted_gzip_fails(self):
+ _, lines = write_data(15)
+ file_name = self._create_temp_file()
+ with gzip.GzipFile(file_name, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ with open(file_name, 'wb') as f:
+ f.write('corrupt')
+
+ pipeline = TestPipeline()
+ pcoll = pipeline | 'Read' >> ReadFromText(
+ file_name,
+ 0, CompressionTypes.GZIP,
+ True, coders.StrUtf8Coder())
+ assert_that(pcoll, equal_to(lines))
+
+ with self.assertRaises(Exception):
+ pipeline.run()
+
+ def test_read_gzip_concat(self):
+ gzip_file_name1 = self._create_temp_file()
+ lines = ['a', 'b', 'c']
+ with gzip.open(gzip_file_name1, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ gzip_file_name2 = self._create_temp_file()
+ lines = ['p', 'q', 'r']
+ with gzip.open(gzip_file_name2, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ gzip_file_name3 = self._create_temp_file()
+ lines = ['x', 'y', 'z']
+ with gzip.open(gzip_file_name3, 'wb') as dst:
+ data = '\n'.join(lines) + '\n'
+ dst.write(data)
+
+ final_gzip_file = self._create_temp_file()
+ with open(gzip_file_name1, 'rb') as src, open(final_gzip_file, 'wb') as dst:
+ dst.writelines(src.readlines())
+
+ with open(gzip_file_name2, 'rb') as src, open(final_gzip_file, 'ab') as dst:
+ dst.writelines(src.readlines())
+
+ with open(gzip_file_name3, 'rb') as src, open(final_gzip_file, 'ab') as dst:
+ dst.writelines(src.readlines())
+
+ pipeline = TestPipeline()
+ lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
+ final_gzip_file,
+ compression_type=beam.io.filesystem.CompressionTypes.GZIP)
+
+ expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']
+ assert_that(lines, equal_to(expected))
+ pipeline.run()
+
def test_read_gzip_large(self):
_, lines = write_data(10000)
file_name = self._create_temp_file()
[2/2] beam git commit: This closes #3678
Posted by ch...@apache.org.
This closes #3678
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4f9e926
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4f9e926
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4f9e926
Branch: refs/heads/master
Commit: d4f9e92680eb1ca88a1a524fc9dedd0f172e226c
Parents: 55bb423 5d46243
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Aug 3 11:05:25 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Aug 3 11:05:25 2017 -0700
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 16 ---
sdks/python/apache_beam/io/filesystem.py | 31 +++--
sdks/python/apache_beam/io/textio_test.py | 115 +++++++++++++++++++
3 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------