You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/28 18:06:25 UTC

[1/2] incubator-beam git commit: Make TextFileReader observable

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 26ff65795 -> 53ab635c7


Make TextFileReader observable

This allows future implementation of size tracking for elements in side input sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4054ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4054ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4054ba

Branch: refs/heads/python-sdk
Commit: 2f4054ba37da1c1100f45a572d96e7a6e2e60152
Parents: 26ff657
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 25 11:44:22 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:05:55 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f4054ba/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 3afaae8..b1e091b 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -745,10 +745,12 @@ class NativeTextFileSink(iobase.NativeSink):
 # TextFileReader, TextMultiFileReader.
 
 
-class TextFileReader(iobase.NativeSourceReader):
+class TextFileReader(iobase.NativeSourceReader,
+                     coders.observable.ObservableMixin):
   """A reader for a text file source."""
 
   def __init__(self, source):
+    super(TextFileReader, self).__init__()
     self.source = source
     self.start_offset = self.source.start_offset or 0
     self.end_offset = self.source.end_offset
@@ -778,6 +780,7 @@ class TextFileReader(iobase.NativeSourceReader):
       self._file.seek(self.start_offset - 1)
       self.current_offset -= 1
       line = self._file.readline()
+      self.notify_observers(line, is_encoded=True)
       self.current_offset += len(line)
     else:
       self._file.seek(self.start_offset)
@@ -801,6 +804,7 @@ class TextFileReader(iobase.NativeSourceReader):
         # a dynamic split request from the service.
         return
       line = self._file.readline()
+      self.notify_observers(line, is_encoded=True)
       self.current_offset += len(line)
       if self.source.strip_trailing_newlines:
         line = line.rstrip('\n')


[2/2] incubator-beam git commit: Closes #726

Posted by ro...@apache.org.
Closes #726


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53ab635c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53ab635c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53ab635c

Branch: refs/heads/python-sdk
Commit: 53ab635c75e4b94b3930a601911a717ddc499efe
Parents: 26ff657 2f4054b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 11:05:56 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:05:56 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------