You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/28 18:06:25 UTC
[1/2] incubator-beam git commit: Make TextFileReader observable
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 26ff65795 -> 53ab635c7
Make TextFileReader observable
This allows future implementation of size tracking for elements in side input sources.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4054ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4054ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4054ba
Branch: refs/heads/python-sdk
Commit: 2f4054ba37da1c1100f45a572d96e7a6e2e60152
Parents: 26ff657
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 25 11:44:22 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:05:55 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f4054ba/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 3afaae8..b1e091b 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -745,10 +745,12 @@ class NativeTextFileSink(iobase.NativeSink):
# TextFileReader, TextMultiFileReader.
-class TextFileReader(iobase.NativeSourceReader):
+class TextFileReader(iobase.NativeSourceReader,
+ coders.observable.ObservableMixin):
"""A reader for a text file source."""
def __init__(self, source):
+ super(TextFileReader, self).__init__()
self.source = source
self.start_offset = self.source.start_offset or 0
self.end_offset = self.source.end_offset
@@ -778,6 +780,7 @@ class TextFileReader(iobase.NativeSourceReader):
self._file.seek(self.start_offset - 1)
self.current_offset -= 1
line = self._file.readline()
+ self.notify_observers(line, is_encoded=True)
self.current_offset += len(line)
else:
self._file.seek(self.start_offset)
@@ -801,6 +804,7 @@ class TextFileReader(iobase.NativeSourceReader):
# a dynamic split request from the service.
return
line = self._file.readline()
+ self.notify_observers(line, is_encoded=True)
self.current_offset += len(line)
if self.source.strip_trailing_newlines:
line = line.rstrip('\n')
[2/2] incubator-beam git commit: Closes #726
Posted by ro...@apache.org.
Closes #726
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53ab635c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53ab635c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53ab635c
Branch: refs/heads/python-sdk
Commit: 53ab635c75e4b94b3930a601911a717ddc499efe
Parents: 26ff657 2f4054b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 11:05:56 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:05:56 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------