You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/11 00:56:39 UTC

[1/2] beam git commit: Compressed file with missing last EOF create a fake element

Repository: beam
Updated Branches:
  refs/heads/python-sdk 2d190a29b -> 86d420376


Compressed file with missing last EOF create a fake element


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb80e09b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb80e09b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb80e09b

Branch: refs/heads/python-sdk
Commit: fb80e09b80de04467cef795ef16b12626dec7471
Parents: 2d190a2
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jan 10 10:40:29 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 10 16:56:28 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/textio.py      | 10 ++++++
 sdks/python/apache_beam/io/textio_test.py | 44 ++++++++++++++++++++++++--
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fb80e09b/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 09cf024..4cdab12 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -110,6 +110,16 @@ class _TextSource(filebasedsource.FileBasedSource):
       while range_tracker.try_claim(next_record_start_position):
         record, num_bytes_to_next_record = self._read_record(file_to_read,
                                                              read_buffer)
+
+        # For compressed text files that use an unsplittable OffsetRangeTracker
+        # with infinity as the end position, above 'try_claim()' invocation
+        # would pass for an empty record at the end of file that is not
+        # followed by a new line character. Since such a record is at the last
+        # position of a file, it should not be a part of the considered range.
+        # We do this check to ignore such records.
+        if len(record) == 0 and num_bytes_to_next_record < 0:
+          break
+
         yield self._coder.decode(record)
         if num_bytes_to_next_record < 0:
           break

http://git-wip-us.apache.org/repos/asf/beam/blob/fb80e09b/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 39ddec4..877e190 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -56,11 +56,12 @@ class TextSourceTest(unittest.TestCase):
   DEFAULT_NUM_RECORDS = 100
 
   def _run_read_test(self, file_or_pattern, expected_data,
-                     buffer_size=DEFAULT_NUM_RECORDS):
+                     buffer_size=DEFAULT_NUM_RECORDS,
+                     compression=CompressionTypes.UNCOMPRESSED):
     # Since each record usually takes more than 1 byte, default buffer size is
     # smaller than the total size of the file. This is done to
     # increase test coverage for cases that hit the buffer boundary.
-    source = TextSource(file_or_pattern, 0, CompressionTypes.UNCOMPRESSED,
+    source = TextSource(file_or_pattern, 0, compression,
                         True, coders.StrUtf8Coder(), buffer_size)
     range_tracker = source.get_range_tracker(None, None)
     read_data = [record for record in source.read(range_tracker)]
@@ -128,6 +129,45 @@ class TextSourceTest(unittest.TestCase):
     # without an end of line character.
     self._run_read_test(file_name, [])
 
+  def test_read_single_file_last_line_no_eol_gzip(self):
+    file_name, expected_data = write_data(
+        TextSourceTest.DEFAULT_NUM_RECORDS,
+        eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+    gzip_file_name = file_name + '.gz'
+    with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+      dst.writelines(src)
+
+    assert len(expected_data) == TextSourceTest.DEFAULT_NUM_RECORDS
+    self._run_read_test(gzip_file_name, expected_data,
+                        compression=CompressionTypes.GZIP)
+
+  def test_read_single_file_single_line_no_eol_gzip(self):
+    file_name, expected_data = write_data(
+        1, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+    gzip_file_name = file_name + '.gz'
+    with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+      dst.writelines(src)
+
+    assert len(expected_data) == 1
+    self._run_read_test(gzip_file_name, expected_data,
+                        compression=CompressionTypes.GZIP)
+
+  def test_read_empty_single_file_no_eol_gzip(self):
+    file_name, written_data = write_data(
+        1, no_data=True, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+    gzip_file_name = file_name + '.gz'
+    with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+      dst.writelines(src)
+
+    assert len(written_data) == 1
+    # written data has a single entry with an empty string. Reading the source
+    # should not produce anything since we only wrote a single empty string
+    # without an end of line character.
+    self._run_read_test(gzip_file_name, [], compression=CompressionTypes.GZIP)
+
   def test_read_single_file_with_empty_lines(self):
     file_name, expected_data = write_data(
         TextSourceTest.DEFAULT_NUM_RECORDS, no_data=True, eol=EOL.LF)


[2/2] beam git commit: Closes #1757

Posted by ro...@apache.org.
Closes #1757


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86d42037
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86d42037
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86d42037

Branch: refs/heads/python-sdk
Commit: 86d420376b14a9ff19b4f078470d40a7fd4267a5
Parents: 2d190a2 fb80e09
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 10 16:56:29 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 10 16:56:29 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/textio.py      | 10 ++++++
 sdks/python/apache_beam/io/textio_test.py | 44 ++++++++++++++++++++++++--
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------