You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/05 16:54:00 UTC

[1/2] beam git commit: [BEAM-778] Make filesystem._CompressedFile seekable.

Repository: beam
Updated Branches:
  refs/heads/master 645d0bba9 -> e2a2836ad


[BEAM-778] Make filesystem._CompressedFile seekable.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10e5a22b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10e5a22b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10e5a22b

Branch: refs/heads/master
Commit: 10e5a22b8479bff249c708194e327137458ca2b3
Parents: 645d0bb
Author: Tibor Kiss <ti...@gmail.com>
Authored: Fri Mar 31 07:11:07 2017 +0200
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 5 09:51:42 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio_test.py     |  31 ---
 sdks/python/apache_beam/io/filesystem.py      | 122 ++++++++++--
 sdks/python/apache_beam/io/filesystem_test.py | 221 +++++++++++++++++++++
 sdks/python/setup.py                          |   2 +-
 4 files changed, 327 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 504a2b9..2409873 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -30,7 +30,6 @@ import hamcrest as hc
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
-from apache_beam.io.filesystem import CompressedFile
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -70,36 +69,6 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase):
     return file_name
 
 
-class TestCompressedFile(_TestCaseWithTempDirCleanUp):
-
-  def test_seekable(self):
-    readable = CompressedFile(open(self._create_temp_file(), 'r'))
-    self.assertFalse(readable.seekable)
-
-    writeable = CompressedFile(open(self._create_temp_file(), 'w'))
-    self.assertFalse(writeable.seekable)
-
-  def test_tell(self):
-    lines = ['line%d\n' % i for i in range(10)]
-    tmpfile = self._create_temp_file()
-    writeable = CompressedFile(open(tmpfile, 'w'))
-    current_offset = 0
-    for line in lines:
-      writeable.write(line)
-      current_offset += len(line)
-      self.assertEqual(current_offset, writeable.tell())
-
-    writeable.close()
-    readable = CompressedFile(open(tmpfile))
-    current_offset = 0
-    while True:
-      line = readable.readline()
-      current_offset += len(line)
-      self.assertEqual(current_offset, readable.tell())
-      if not line:
-        break
-
-
 class MyFileSink(fileio.FileSink):
 
   def open(self, temp_path):

http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index b0d2f48..e6c3c29 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -23,7 +23,10 @@ import bz2
 import cStringIO
 import os
 import zlib
+import logging
+import time
 
+logger = logging.getLogger(__name__)
 
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
@@ -79,7 +82,8 @@ class CompressionTypes(object):
 
 
 class CompressedFile(object):
-  """Somewhat limited file wrapper for easier handling of compressed files."""
+  """File wrapper for easier handling of compressed files."""
+  # XXX: This class is not thread safe in the read path.
 
   # The bit mask to use for the wbits parameters of the zlib compressor and
   # decompressor objects.
@@ -107,6 +111,7 @@ class CompressedFile(object):
       raise ValueError('File object must be at position 0 but was %d' %
                        self._file.tell())
     self._uncompressed_position = 0
+    self._uncompressed_size = None
 
     if self.readable():
       self._read_size = read_size
@@ -114,24 +119,30 @@ class CompressedFile(object):
       self._read_position = 0
       self._read_eof = False
 
-      if self._compression_type == CompressionTypes.BZIP2:
-        self._decompressor = bz2.BZ2Decompressor()
-      else:
-        assert self._compression_type == CompressionTypes.GZIP
-        self._decompressor = zlib.decompressobj(self._gzip_mask)
+      self._initialize_decompressor()
     else:
       self._decompressor = None
 
     if self.writeable():
-      if self._compression_type == CompressionTypes.BZIP2:
-        self._compressor = bz2.BZ2Compressor()
-      else:
-        assert self._compression_type == CompressionTypes.GZIP
-        self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
-                                            zlib.DEFLATED, self._gzip_mask)
+      self._initialize_compressor()
     else:
       self._compressor = None
 
+  def _initialize_decompressor(self):
+    if self._compression_type == CompressionTypes.BZIP2:
+      self._decompressor = bz2.BZ2Decompressor()
+    else:
+      assert self._compression_type == CompressionTypes.GZIP
+      self._decompressor = zlib.decompressobj(self._gzip_mask)
+
+  def _initialize_compressor(self):
+    if self._compression_type == CompressionTypes.BZIP2:
+      self._compressor = bz2.BZ2Compressor()
+    else:
+      assert self._compression_type == CompressionTypes.GZIP
+      self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+                                          zlib.DEFLATED, self._gzip_mask)
+
   def readable(self):
     mode = self._file.mode
     return 'r' in mode or 'a' in mode
@@ -158,9 +169,7 @@ class CompressedFile(object):
       # but without dropping any previous held data.
       self._read_buffer.seek(self._read_position)
       data = self._read_buffer.read()
-      self._read_position = 0
-      self._read_buffer.seek(0)
-      self._read_buffer.truncate(0)
+      self._clear_read_buffer()
       self._read_buffer.write(data)
 
     while not self._read_eof and (self._read_buffer.tell() - self._read_position
@@ -250,8 +259,87 @@ class CompressedFile(object):
 
   @property
   def seekable(self):
-    # TODO: Add support for seeking to a file position.
-    return False
+    return self._file.mode == 'r'
+
+  def _clear_read_buffer(self):
+    """Clears the read buffer by removing all the contents and
+    resetting _read_position to 0"""
+    self._read_position = 0
+    self._read_buffer.seek(0)
+    self._read_buffer.truncate(0)
+
+  def _rewind_file(self):
+    """Seeks to the beginning of the input file. Input file's EOF marker
+    is cleared and _uncompressed_position is reset to zero"""
+    self._file.seek(0, os.SEEK_SET)
+    self._read_eof = False
+    self._uncompressed_position = 0
+
+  def _rewind(self):
+    """Seeks to the beginning of the input file and resets the internal read
+    buffer. The decompressor object is re-initialized to ensure that no data
+    left in it's buffer."""
+    self._clear_read_buffer()
+    self._rewind_file()
+
+    # Re-initialize decompressor to clear any data buffered prior to rewind
+    self._initialize_decompressor()
+
+  def seek(self, offset, whence=os.SEEK_SET):
+    """Set the file's current offset.
+
+    Seeking behavior:
+      * seeking from the end (SEEK_END) the whole file is decompressed once to
+        determine it's size. Therefore it is preferred to use
+        SEEK_SET or SEEK_CUR to avoid the processing overhead
+      * seeking backwards from the current position rewinds the file to 0
+        and decompresses the chunks to the requested offset
+      * seeking is only supported in files opened for reading
+      * if the new offset is out of bound, it is adjusted to either 0 or EOF.
+
+    Args:
+      offset: seek offset in the uncompressed content represented as number
+      whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
+        os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
+        (seek relative to the end, offset should be negative).
+
+    Raises:
+      IOError: When this buffer is closed.
+      ValueError: When whence is invalid or the file is not seekable
+    """
+    if whence == os.SEEK_SET:
+      absolute_offset = offset
+    elif whence == os.SEEK_CUR:
+      absolute_offset = self._uncompressed_position + offset
+    elif whence == os.SEEK_END:
+      # Determine and cache the uncompressed size of the file
+      if not self._uncompressed_size:
+        logger.warn("Seeking relative from end of file is requested. "
+                    "Need to decompress the whole file once to determine "
+                    "its size. This might take a while...")
+        uncompress_start_time = time.time()
+        while self.read(self._read_size):
+          pass
+        uncompress_end_time = time.time()
+        logger.warn("Full file decompression for seek from end took %.2f secs",
+                    (uncompress_end_time - uncompress_start_time))
+        self._uncompressed_size = self._uncompressed_position
+      absolute_offset = self._uncompressed_size + offset
+    else:
+      raise ValueError("Whence mode %r is invalid." % whence)
+
+    # Determine how many bytes needs to be read before we reach
+    # the requested offset. Rewind if we already passed the position.
+    if absolute_offset < self._uncompressed_position:
+      self._rewind()
+    bytes_to_skip = absolute_offset - self._uncompressed_position
+
+    # Read until the desired position is reached or EOF occurs.
+    while bytes_to_skip:
+      data = self.read(min(self._read_size, bytes_to_skip))
+      if not data:
+        break
+      bytes_to_skip -= len(data)
 
   def tell(self):
     """Returns current position in uncompressed file."""

http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/apache_beam/io/filesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
new file mode 100644
index 0000000..168925d
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -0,0 +1,221 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for filesystem module."""
+import shutil
+import os
+import unittest
+import tempfile
+import bz2
+import gzip
+from StringIO import StringIO
+
+from apache_beam.io.filesystem import CompressedFile, CompressionTypes
+
+
+class _TestCaseWithTempDirCleanUp(unittest.TestCase):
+  """Base class for TestCases that deals with TempDir clean-up.
+
+  Inherited test cases will call self._new_tempdir() to start a temporary dir
+  which will be deleted at the end of the tests (when tearDown() is called).
+  """
+
+  def setUp(self):
+    self._tempdirs = []
+
+  def tearDown(self):
+    for path in self._tempdirs:
+      if os.path.exists(path):
+        shutil.rmtree(path)
+    self._tempdirs = []
+
+  def _new_tempdir(self):
+    result = tempfile.mkdtemp()
+    self._tempdirs.append(result)
+    return result
+
+  def _create_temp_file(self, name='', suffix=''):
+    if not name:
+      name = tempfile.template
+    file_name = tempfile.NamedTemporaryFile(
+        delete=False, prefix=name,
+        dir=self._new_tempdir(), suffix=suffix).name
+    return file_name
+
+
+class TestCompressedFile(_TestCaseWithTempDirCleanUp):
+  content = """- the BEAM -
+How things really are we would like to know.
+Does
+     Time
+          flow, is it elastic, or is it
+atomized in instants hammered around the
+    clock's face? ...
+- May Swenson"""
+
+  # Keep the read block size small so that we exercise the seek functionality
+  # in compressed file and not just in the internal buffer
+  read_block_size = 4
+
+  def _create_compressed_file(self, compression_type, content,
+                              name='', suffix=''):
+    file_name = self._create_temp_file(name, suffix)
+
+    if compression_type == CompressionTypes.BZIP2:
+      compress_factory = bz2.BZ2File
+    elif compression_type == CompressionTypes.GZIP:
+      compress_factory = gzip.open
+    else:
+      assert False, "Invalid compression type: %s" % compression_type
+
+    with compress_factory(file_name, 'w') as f:
+      f.write(content)
+
+    return file_name
+
+  def test_seekable_enabled_on_read(self):
+    readable = CompressedFile(open(self._create_temp_file(), 'r'))
+    self.assertTrue(readable.seekable)
+
+  def test_seekable_disabled_on_write(self):
+    writeable = CompressedFile(open(self._create_temp_file(), 'w'))
+    self.assertFalse(writeable.seekable)
+
+  def test_seekable_disabled_on_append(self):
+    writeable = CompressedFile(open(self._create_temp_file(), 'a'))
+    self.assertFalse(writeable.seekable)
+
+  def test_seek_set(self):
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+      file_name = self._create_compressed_file(compression_type, self.content)
+
+      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+                                     read_size=self.read_block_size)
+      reference_fd = StringIO(self.content)
+
+      # Note: content (readline) check must come before position (tell) check
+      # because cStringIO's tell() reports out of bound positions (if we seek
+      # beyond the file) up until a real read occurs.
+      # _CompressedFile.tell() always stays within the bounds of the
+      # uncompressed content.
+      for seek_position in (-1, 0, 1,
+                            len(self.content)-1, len(self.content),
+                            len(self.content) + 1):
+        compressed_fd.seek(seek_position, os.SEEK_SET)
+        reference_fd.seek(seek_position, os.SEEK_SET)
+
+        uncompressed_line = compressed_fd.readline()
+        reference_line = reference_fd.readline()
+        self.assertEqual(uncompressed_line, reference_line)
+
+        uncompressed_position = compressed_fd.tell()
+        reference_position = reference_fd.tell()
+        self.assertEqual(uncompressed_position, reference_position)
+
+  def test_seek_cur(self):
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+      file_name = self._create_compressed_file(compression_type, self.content)
+
+      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+                                     read_size=self.read_block_size)
+      reference_fd = StringIO(self.content)
+
+      # Test out of bound, inbound seeking in both directions
+      for seek_position in (-1, 0, 1,
+                            len(self.content) / 2,
+                            len(self.content) / 2,
+                            -1 * len(self.content) / 2):
+        compressed_fd.seek(seek_position, os.SEEK_CUR)
+        reference_fd.seek(seek_position, os.SEEK_CUR)
+
+        uncompressed_line = compressed_fd.readline()
+        expected_line = reference_fd.readline()
+        self.assertEqual(uncompressed_line, expected_line)
+
+        reference_position = reference_fd.tell()
+        uncompressed_position = compressed_fd.tell()
+        self.assertEqual(uncompressed_position, reference_position)
+
+  def test_read_from_end_returns_no_data(self):
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+      file_name = self._create_compressed_file(compression_type, self.content)
+
+      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+                                     read_size=self.read_block_size)
+
+      seek_position = 0
+      compressed_fd.seek(seek_position, os.SEEK_END)
+
+      expected_data = ''
+      uncompressed_data = compressed_fd.read(10)
+
+      self.assertEqual(uncompressed_data, expected_data)
+
+  def test_seek_outside(self):
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+      file_name = self._create_compressed_file(compression_type, self.content)
+
+      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+                                     read_size=self.read_block_size)
+
+      for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END):
+        seek_position = -1 * len(self.content) - 10
+        compressed_fd.seek(seek_position, whence)
+
+        expected_position = 0
+        uncompressed_position = compressed_fd.tell()
+        self.assertEqual(uncompressed_position, expected_position)
+
+        seek_position = len(self.content) + 20
+        compressed_fd.seek(seek_position, whence)
+
+        expected_position = len(self.content)
+        uncompressed_position = compressed_fd.tell()
+        self.assertEqual(uncompressed_position, expected_position)
+
+  def test_read_and_seek_back_to_beginning(self):
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+      file_name = self._create_compressed_file(compression_type, self.content)
+      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
+                                     read_size=self.read_block_size)
+
+      first_pass = compressed_fd.readline()
+      compressed_fd.seek(0, os.SEEK_SET)
+      second_pass = compressed_fd.readline()
+
+      self.assertEqual(first_pass, second_pass)
+
+  def test_tell(self):
+    lines = ['line%d\n' % i for i in range(10)]
+    tmpfile = self._create_temp_file()
+    writeable = CompressedFile(open(tmpfile, 'w'))
+    current_offset = 0
+    for line in lines:
+      writeable.write(line)
+      current_offset += len(line)
+      self.assertEqual(current_offset, writeable.tell())
+
+    writeable.close()
+    readable = CompressedFile(open(tmpfile))
+    current_offset = 0
+    while True:
+      line = readable.readline()
+      current_offset += len(line)
+      self.assertEqual(current_offset, readable.tell())
+      if not line:
+        break

http://git-wip-us.apache.org/repos/asf/beam/blob/10e5a22b/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ee3b5e4..4e1c67d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -98,7 +98,7 @@ REQUIRED_PACKAGES = [
     ]
 
 REQUIRED_TEST_PACKAGES = [
-    'pyhamcrest>=1.9,<2.0',
+    'pyhamcrest>=1.9,<2.0'
     ]
 
 GCP_REQUIREMENTS = [


[2/2] beam git commit: This closes #2392

Posted by ch...@apache.org.
This closes #2392


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2a2836a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2a2836a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2a2836a

Branch: refs/heads/master
Commit: e2a2836ad016fd6dbc0e06add3345db28ea08fa7
Parents: 645d0bb 10e5a22
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Apr 5 09:53:15 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 5 09:53:15 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio_test.py     |  31 ---
 sdks/python/apache_beam/io/filesystem.py      | 122 ++++++++++--
 sdks/python/apache_beam/io/filesystem_test.py | 221 +++++++++++++++++++++
 sdks/python/setup.py                          |   2 +-
 4 files changed, 327 insertions(+), 49 deletions(-)
----------------------------------------------------------------------