You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/04 02:22:47 UTC

[beam] branch master updated: [BEAM-7137] encode header to bytes when writing to file at apache_beam.io.textio.WriteToText

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a465e  [BEAM-7137] encode header to bytes when writing to file at apache_beam.io.textio.WriteToText
     new c01b012  Merge pull request #8452 from lazylynx/writetotext-header-encode
72a465e is described below

commit 72a465e1883f0ff71410226828b96d20c049b36f
Author: yoshiki.obata <yo...@gmail.com>
AuthorDate: Sun Apr 28 02:35:57 2019 +0900

    [BEAM-7137] encode header to bytes when writing to file at apache_beam.io.textio.WriteToText
---
 sdks/python/apache_beam/io/textio.py      | 2 +-
 sdks/python/apache_beam/io/textio_test.py | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 0a05958..340449f 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -390,7 +390,7 @@ class _TextSink(filebasedsink.FileBasedSink):
   def open(self, temp_path):
     file_handle = super(_TextSink, self).open(temp_path)
     if self._header is not None:
-      file_handle.write(self._header)
+      file_handle.write(coders.ToStringCoder().encode(self._header))
       if self._append_trailing_newlines:
         file_handle.write(b'\n')
     return file_handle
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 780107a..fc6da45 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -1149,7 +1149,7 @@ class TextSinkTest(unittest.TestCase):
   def test_write_dataflow_header(self):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
-    header_text = b'foo'
+    header_text = 'foo'
     pcoll | 'Write' >> WriteToText(  # pylint: disable=expression-not-assigned
         self.path + '.gz',
         shard_name_template='',
@@ -1160,8 +1160,8 @@ class TextSinkTest(unittest.TestCase):
     for file_name in glob.glob(self.path + '*'):
       with gzip.GzipFile(file_name, 'rb') as f:
         read_result.extend(f.read().splitlines())
-
-    self.assertEqual(read_result, [header_text] + self.lines)
+    # header_text is automatically encoded in WriteToText
+    self.assertEqual(read_result, [header_text.encode('utf-8')] + self.lines)
 
 
 if __name__ == '__main__':