You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/05 16:54:00 UTC
[1/2] beam git commit: [BEAM-778] Make filesystem._CompressedFile
seekable.
Repository: beam
Updated Branches:
refs/heads/master 645d0bba9 -> e2a2836ad
[BEAM-778] Make filesystem._CompressedFile seekable.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10e5a22b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10e5a22b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10e5a22b
Branch: refs/heads/master
Commit: 10e5a22b8479bff249c708194e327137458ca2b3
Parents: 645d0bb
Author: Tibor Kiss <ti...@gmail.com>
Authored: Fri Mar 31 07:11:07 2017 +0200
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 5 09:51:42 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio_test.py | 31 ---
sdks/python/apache_beam/io/filesystem.py | 122 ++++++++++--
sdks/python/apache_beam/io/filesystem_test.py | 221 +++++++++++++++++++++
sdks/python/setup.py | 2 +-
4 files changed, 327 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 504a2b9..2409873 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -30,7 +30,6 @@ import hamcrest as hc
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
-from apache_beam.io.filesystem import CompressedFile
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -70,36 +69,6 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase):
return file_name
-class TestCompressedFile(_TestCaseWithTempDirCleanUp):
-
- def test_seekable(self):
- readable = CompressedFile(open(self._create_temp_file(), 'r'))
- self.assertFalse(readable.seekable)
-
- writeable = CompressedFile(open(self._create_temp_file(), 'w'))
- self.assertFalse(writeable.seekable)
-
- def test_tell(self):
- lines = ['line%d\n' % i for i in range(10)]
- tmpfile = self._create_temp_file()
- writeable = CompressedFile(open(tmpfile, 'w'))
- current_offset = 0
- for line in lines:
- writeable.write(line)
- current_offset += len(line)
- self.assertEqual(current_offset, writeable.tell())
-
- writeable.close()
- readable = CompressedFile(open(tmpfile))
- current_offset = 0
- while True:
- line = readable.readline()
- current_offset += len(line)
- self.assertEqual(current_offset, readable.tell())
- if not line:
- break
-
-
class MyFileSink(fileio.FileSink):
def open(self, temp_path):
http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index b0d2f48..e6c3c29 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -23,7 +23,10 @@ import bz2
import cStringIO
import os
import zlib
+import logging
+import time
+logger = logging.getLogger(__name__)
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -79,7 +82,8 @@ class CompressionTypes(object):
class CompressedFile(object):
- """Somewhat limited file wrapper for easier handling of compressed files."""
+ """File wrapper for easier handling of compressed files."""
+ # XXX: This class is not thread safe in the read path.
# The bit mask to use for the wbits parameters of the zlib compressor and
# decompressor objects.
@@ -107,6 +111,7 @@ class CompressedFile(object):
raise ValueError('File object must be at position 0 but was %d' %
self._file.tell())
self._uncompressed_position = 0
+ self._uncompressed_size = None
if self.readable():
self._read_size = read_size
@@ -114,24 +119,30 @@ class CompressedFile(object):
self._read_position = 0
self._read_eof = False
- if self._compression_type == CompressionTypes.BZIP2:
- self._decompressor = bz2.BZ2Decompressor()
- else:
- assert self._compression_type == CompressionTypes.GZIP
- self._decompressor = zlib.decompressobj(self._gzip_mask)
+ self._initialize_decompressor()
else:
self._decompressor = None
if self.writeable():
- if self._compression_type == CompressionTypes.BZIP2:
- self._compressor = bz2.BZ2Compressor()
- else:
- assert self._compression_type == CompressionTypes.GZIP
- self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED, self._gzip_mask)
+ self._initialize_compressor()
else:
self._compressor = None
+ def _initialize_decompressor(self):
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._decompressor = bz2.BZ2Decompressor()
+ else:
+ assert self._compression_type == CompressionTypes.GZIP
+ self._decompressor = zlib.decompressobj(self._gzip_mask)
+
+ def _initialize_compressor(self):
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._compressor = bz2.BZ2Compressor()
+ else:
+ assert self._compression_type == CompressionTypes.GZIP
+ self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+ zlib.DEFLATED, self._gzip_mask)
+
def readable(self):
mode = self._file.mode
return 'r' in mode or 'a' in mode
@@ -158,9 +169,7 @@ class CompressedFile(object):
# but without dropping any previous held data.
self._read_buffer.seek(self._read_position)
data = self._read_buffer.read()
- self._read_position = 0
- self._read_buffer.seek(0)
- self._read_buffer.truncate(0)
+ self._clear_read_buffer()
self._read_buffer.write(data)
while not self._read_eof and (self._read_buffer.tell() - self._read_position
@@ -250,8 +259,87 @@ class CompressedFile(object):
@property
def seekable(self):
- # TODO: Add support for seeking to a file position.
- return False
+ return self._file.mode == 'r'
+
+ def _clear_read_buffer(self):
+ """Clears the read buffer by removing all the contents and
+ resetting _read_position to 0"""
+ self._read_position = 0
+ self._read_buffer.seek(0)
+ self._read_buffer.truncate(0)
+
+ def _rewind_file(self):
+ """Seeks to the beginning of the input file. Input file's EOF marker
+ is cleared and _uncompressed_position is reset to zero"""
+ self._file.seek(0, os.SEEK_SET)
+ self._read_eof = False
+ self._uncompressed_position = 0
+
+ def _rewind(self):
+ """Seeks to the beginning of the input file and resets the internal read
+ buffer. The decompressor object is re-initialized to ensure that no data
+ left in it's buffer."""
+ self._clear_read_buffer()
+ self._rewind_file()
+
+ # Re-initialize decompressor to clear any data buffered prior to rewind
+ self._initialize_decompressor()
+
+ def seek(self, offset, whence=os.SEEK_SET):
+ """Set the file's current offset.
+
+ Seeking behavior:
+ * seeking from the end (SEEK_END) the whole file is decompressed once to
+ determine it's size. Therefore it is preferred to use
+ SEEK_SET or SEEK_CUR to avoid the processing overhead
+ * seeking backwards from the current position rewinds the file to 0
+ and decompresses the chunks to the requested offset
+ * seeking is only supported in files opened for reading
+ * if the new offset is out of bound, it is adjusted to either 0 or EOF.
+
+ Args:
+ offset: seek offset in the uncompressed content represented as number
+ whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
+ os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
+ (seek relative to the end, offset should be negative).
+
+ Raises:
+ IOError: When this buffer is closed.
+ ValueError: When whence is invalid or the file is not seekable
+ """
+ if whence == os.SEEK_SET:
+ absolute_offset = offset
+ elif whence == os.SEEK_CUR:
+ absolute_offset = self._uncompressed_position + offset
+ elif whence == os.SEEK_END:
+ # Determine and cache the uncompressed size of the file
+ if not self._uncompressed_size:
+ logger.warn("Seeking relative from end of file is requested. "
+ "Need to decompress the whole file once to determine "
+ "its size. This might take a while...")
+ uncompress_start_time = time.time()
+ while self.read(self._read_size):
+ pass
+ uncompress_end_time = time.time()
+ logger.warn("Full file decompression for seek from end took %.2f secs",
+ (uncompress_end_time - uncompress_start_time))
+ self._uncompressed_size = self._uncompressed_position
+ absolute_offset = self._uncompressed_size + offset
+ else:
+ raise ValueError("Whence mode %r is invalid." % whence)
+
+ # Determine how many bytes needs to be read before we reach
+ # the requested offset. Rewind if we already passed the position.
+ if absolute_offset < self._uncompressed_position:
+ self._rewind()
+ bytes_to_skip = absolute_offset - self._uncompressed_position
+
+ # Read until the desired position is reached or EOF occurs.
+ while bytes_to_skip:
+ data = self.read(min(self._read_size, bytes_to_skip))
+ if not data:
+ break
+ bytes_to_skip -= len(data)
def tell(self):
"""Returns current position in uncompressed file."""
http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
new file mode 100644
index 0000000..168925d
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -0,0 +1,221 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for filesystem module."""
+import shutil
+import os
+import unittest
+import tempfile
+import bz2
+import gzip
+from StringIO import StringIO
+
+from apache_beam.io.filesystem import CompressedFile, CompressionTypes
+
+
+class _TestCaseWithTempDirCleanUp(unittest.TestCase):
+ """Base class for TestCases that deals with TempDir clean-up.
+
+ Inherited test cases will call self._new_tempdir() to start a temporary dir
+ which will be deleted at the end of the tests (when tearDown() is called).
+ """
+
+ def setUp(self):
+ self._tempdirs = []
+
+ def tearDown(self):
+ for path in self._tempdirs:
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ self._tempdirs = []
+
+ def _new_tempdir(self):
+ result = tempfile.mkdtemp()
+ self._tempdirs.append(result)
+ return result
+
+ def _create_temp_file(self, name='', suffix=''):
+ if not name:
+ name = tempfile.template
+ file_name = tempfile.NamedTemporaryFile(
+ delete=False, prefix=name,
+ dir=self._new_tempdir(), suffix=suffix).name
+ return file_name
+
+
+class TestCompressedFile(_TestCaseWithTempDirCleanUp):
+ content = """- the BEAM -
+How things really are we would like to know.
+Does
+ Time
+ flow, is it elastic, or is it
+atomized in instants hammered around the
+ clock's face? ...
+- May Swenson"""
+
+ # Keep the read block size small so that we exercise the seek functionality
+ # in compressed file and not just in the internal buffer
+ read_block_size = 4
+
+ def _create_compressed_file(self, compression_type, content,
+ name='', suffix=''):
+ file_name = self._create_temp_file(name, suffix)
+
+ if compression_type == CompressionTypes.BZIP2:
+ compress_factory = bz2.BZ2File
+ elif compression_type == CompressionTypes.GZIP:
+ compress_factory = gzip.open
+ else:
+ assert False, "Invalid compression type: %s" % compression_type
+
+ with compress_factory(file_name, 'w') as f:
+ f.write(content)
+
+ return file_name
+
+ def test_seekable_enabled_on_read(self):
+ readable = CompressedFile(open(self._create_temp_file(), 'r'))
+ self.assertTrue(readable.seekable)
+
+ def test_seekable_disabled_on_write(self):
+ writeable = CompressedFile(open(self._create_temp_file(), 'w'))
+ self.assertFalse(writeable.seekable)
+
+ def test_seekable_disabled_on_append(self):
+ writeable = CompressedFile(open(self._create_temp_file(), 'a'))
+ self.assertFalse(writeable.seekable)
+
+ def test_seek_set(self):
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = self._create_compressed_file(compression_type, self.content)
+
+ compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+ read_size=self.read_block_size)
+ reference_fd = StringIO(self.content)
+
+ # Note: content (readline) check must come before position (tell) check
+ # because cStringIO's tell() reports out of bound positions (if we seek
+ # beyond the file) up until a real read occurs.
+ # _CompressedFile.tell() always stays within the bounds of the
+ # uncompressed content.
+ for seek_position in (-1, 0, 1,
+ len(self.content)-1, len(self.content),
+ len(self.content) + 1):
+ compressed_fd.seek(seek_position, os.SEEK_SET)
+ reference_fd.seek(seek_position, os.SEEK_SET)
+
+ uncompressed_line = compressed_fd.readline()
+ reference_line = reference_fd.readline()
+ self.assertEqual(uncompressed_line, reference_line)
+
+ uncompressed_position = compressed_fd.tell()
+ reference_position = reference_fd.tell()
+ self.assertEqual(uncompressed_position, reference_position)
+
+ def test_seek_cur(self):
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = self._create_compressed_file(compression_type, self.content)
+
+ compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+ read_size=self.read_block_size)
+ reference_fd = StringIO(self.content)
+
+ # Test out of bound, inbound seeking in both directions
+ for seek_position in (-1, 0, 1,
+ len(self.content) / 2,
+ len(self.content) / 2,
+ -1 * len(self.content) / 2):
+ compressed_fd.seek(seek_position, os.SEEK_CUR)
+ reference_fd.seek(seek_position, os.SEEK_CUR)
+
+ uncompressed_line = compressed_fd.readline()
+ expected_line = reference_fd.readline()
+ self.assertEqual(uncompressed_line, expected_line)
+
+ reference_position = reference_fd.tell()
+ uncompressed_position = compressed_fd.tell()
+ self.assertEqual(uncompressed_position, reference_position)
+
+ def test_read_from_end_returns_no_data(self):
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = self._create_compressed_file(compression_type, self.content)
+
+ compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+ read_size=self.read_block_size)
+
+ seek_position = 0
+ compressed_fd.seek(seek_position, os.SEEK_END)
+
+ expected_data = ''
+ uncompressed_data = compressed_fd.read(10)
+
+ self.assertEqual(uncompressed_data, expected_data)
+
+ def test_seek_outside(self):
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = self._create_compressed_file(compression_type, self.content)
+
+ compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+ read_size=self.read_block_size)
+
+ for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END):
+ seek_position = -1 * len(self.content) - 10
+ compressed_fd.seek(seek_position, whence)
+
+ expected_position = 0
+ uncompressed_position = compressed_fd.tell()
+ self.assertEqual(uncompressed_position, expected_position)
+
+ seek_position = len(self.content) + 20
+ compressed_fd.seek(seek_position, whence)
+
+ expected_position = len(self.content)
+ uncompressed_position = compressed_fd.tell()
+ self.assertEqual(uncompressed_position, expected_position)
+
+ def test_read_and_seek_back_to_beginning(self):
+ for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+ file_name = self._create_compressed_file(compression_type, self.content)
+ compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+ read_size=self.read_block_size)
+
+ first_pass = compressed_fd.readline()
+ compressed_fd.seek(0, os.SEEK_SET)
+ second_pass = compressed_fd.readline()
+
+ self.assertEqual(first_pass, second_pass)
+
+ def test_tell(self):
+ lines = ['line%d\n' % i for i in range(10)]
+ tmpfile = self._create_temp_file()
+ writeable = CompressedFile(open(tmpfile, 'w'))
+ current_offset = 0
+ for line in lines:
+ writeable.write(line)
+ current_offset += len(line)
+ self.assertEqual(current_offset, writeable.tell())
+
+ writeable.close()
+ readable = CompressedFile(open(tmpfile))
+ current_offset = 0
+ while True:
+ line = readable.readline()
+ current_offset += len(line)
+ self.assertEqual(current_offset, readable.tell())
+ if not line:
+ break
http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ee3b5e4..4e1c67d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -98,7 +98,7 @@ REQUIRED_PACKAGES = [
]
REQUIRED_TEST_PACKAGES = [
- 'pyhamcrest>=1.9,<2.0',
+ 'pyhamcrest>=1.9,<2.0'
]
GCP_REQUIREMENTS = [
[2/2] beam git commit: This closes #2392
Posted by ch...@apache.org.
This closes #2392
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2a2836a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2a2836a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2a2836a
Branch: refs/heads/master
Commit: e2a2836ad016fd6dbc0e06add3345db28ea08fa7
Parents: 645d0bb 10e5a22
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Apr 5 09:53:15 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 5 09:53:15 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio_test.py | 31 ---
sdks/python/apache_beam/io/filesystem.py | 122 ++++++++++--
sdks/python/apache_beam/io/filesystem_test.py | 221 +++++++++++++++++++++
sdks/python/setup.py | 2 +-
4 files changed, 327 insertions(+), 49 deletions(-)
----------------------------------------------------------------------