You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/11 00:56:39 UTC
[1/2] beam git commit: Compressed file with missing last EOF create a
fake element
Repository: beam
Updated Branches:
refs/heads/python-sdk 2d190a29b -> 86d420376
Compressed file with missing last EOF create a fake element
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb80e09b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb80e09b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb80e09b
Branch: refs/heads/python-sdk
Commit: fb80e09b80de04467cef795ef16b12626dec7471
Parents: 2d190a2
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jan 10 10:40:29 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 10 16:56:28 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/textio.py | 10 ++++++
sdks/python/apache_beam/io/textio_test.py | 44 ++++++++++++++++++++++++--
2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fb80e09b/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 09cf024..4cdab12 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -110,6 +110,16 @@ class _TextSource(filebasedsource.FileBasedSource):
while range_tracker.try_claim(next_record_start_position):
record, num_bytes_to_next_record = self._read_record(file_to_read,
read_buffer)
+
+ # For compressed text files that use an unsplittable OffsetRangeTracker
+ # with infinity as the end position, above 'try_claim()' invocation
+ # would pass for an empty record at the end of file that is not
+ # followed by a new line character. Since such a record is at the last
+ # position of a file, it should not be a part of the considered range.
+ # We do this check to ignore such records.
+ if len(record) == 0 and num_bytes_to_next_record < 0:
+ break
+
yield self._coder.decode(record)
if num_bytes_to_next_record < 0:
break
http://git-wip-us.apache.org/repos/asf/beam/blob/fb80e09b/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 39ddec4..877e190 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -56,11 +56,12 @@ class TextSourceTest(unittest.TestCase):
DEFAULT_NUM_RECORDS = 100
def _run_read_test(self, file_or_pattern, expected_data,
- buffer_size=DEFAULT_NUM_RECORDS):
+ buffer_size=DEFAULT_NUM_RECORDS,
+ compression=CompressionTypes.UNCOMPRESSED):
# Since each record usually takes more than 1 byte, default buffer size is
# smaller than the total size of the file. This is done to
# increase test coverage for cases that hit the buffer boundary.
- source = TextSource(file_or_pattern, 0, CompressionTypes.UNCOMPRESSED,
+ source = TextSource(file_or_pattern, 0, compression,
True, coders.StrUtf8Coder(), buffer_size)
range_tracker = source.get_range_tracker(None, None)
read_data = [record for record in source.read(range_tracker)]
@@ -128,6 +129,45 @@ class TextSourceTest(unittest.TestCase):
# without an end of line character.
self._run_read_test(file_name, [])
+ def test_read_single_file_last_line_no_eol_gzip(self):
+ file_name, expected_data = write_data(
+ TextSourceTest.DEFAULT_NUM_RECORDS,
+ eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+ gzip_file_name = file_name + '.gz'
+ with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+ dst.writelines(src)
+
+ assert len(expected_data) == TextSourceTest.DEFAULT_NUM_RECORDS
+ self._run_read_test(gzip_file_name, expected_data,
+ compression=CompressionTypes.GZIP)
+
+ def test_read_single_file_single_line_no_eol_gzip(self):
+ file_name, expected_data = write_data(
+ 1, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+ gzip_file_name = file_name + '.gz'
+ with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+ dst.writelines(src)
+
+ assert len(expected_data) == 1
+ self._run_read_test(gzip_file_name, expected_data,
+ compression=CompressionTypes.GZIP)
+
+ def test_read_empty_single_file_no_eol_gzip(self):
+ file_name, written_data = write_data(
+ 1, no_data=True, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)
+
+ gzip_file_name = file_name + '.gz'
+ with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
+ dst.writelines(src)
+
+ assert len(written_data) == 1
+ # written data has a single entry with an empty string. Reading the source
+ # should not produce anything since we only wrote a single empty string
+ # without an end of line character.
+ self._run_read_test(gzip_file_name, [], compression=CompressionTypes.GZIP)
+
def test_read_single_file_with_empty_lines(self):
file_name, expected_data = write_data(
TextSourceTest.DEFAULT_NUM_RECORDS, no_data=True, eol=EOL.LF)
[2/2] beam git commit: Closes #1757
Posted by ro...@apache.org.
Closes #1757
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86d42037
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86d42037
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86d42037
Branch: refs/heads/python-sdk
Commit: 86d420376b14a9ff19b4f078470d40a7fd4267a5
Parents: 2d190a2 fb80e09
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 10 16:56:29 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 10 16:56:29 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/textio.py | 10 ++++++
sdks/python/apache_beam/io/textio_test.py | 44 ++++++++++++++++++++++++--
2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------