You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/03/31 00:47:15 UTC

[beam] branch master updated: [BEAM-12071] Don't re-use WriteToPandasSink instances across windows (#14374)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 911771d  [BEAM-12071] Don't re-use WriteToPandasSink instances across windows (#14374)
911771d is described below

commit 911771d655a66123d7f1a45ab9ba7dc715aa99b8
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue Mar 30 17:46:24 2021 -0700

    [BEAM-12071] Don't re-use WriteToPandasSink instances across windows (#14374)
    
    * Add (failing) windowed write test
    
    * Dont re-use pandas sink instances across windows
---
 sdks/python/apache_beam/dataframe/io.py      |  2 +-
 sdks/python/apache_beam/dataframe/io_test.py | 41 +++++++++++++++++++++++++++-
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index 176e78f..2eebfaf 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -521,7 +521,7 @@ class _WriteToPandas(beam.PTransform):
     return pcoll | fileio.WriteToFiles(
         path=dir,
         file_naming=fileio.default_file_naming(name),
-        sink=_WriteToPandasFileSink(
+        sink=lambda _: _WriteToPandasFileSink(
             self.writer, self.args, self.kwargs, self.incremental, self.binary))
 
 
diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py
index 9d484cc..374eb0c 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -21,7 +21,9 @@ import os
 import platform
 import shutil
 import tempfile
+import typing
 import unittest
+from datetime import datetime
 from io import BytesIO
 from io import StringIO
 
@@ -38,6 +40,11 @@ from apache_beam.io import restriction_trackers
 from apache_beam.testing.util import assert_that
 
 
+class MyRow(typing.NamedTuple):
+  timestamp: int
+  value: int
+
+
 @unittest.skipIf(platform.system() == 'Windows', 'BEAM-10929')
 class IOTest(unittest.TestCase):
   def setUp(self):
@@ -56,12 +63,14 @@ class IOTest(unittest.TestCase):
           fout.write(contents)
     return dir + os.path.sep
 
-  def read_all_lines(self, pattern):
+  def read_all_lines(self, pattern, delete=False):
     for path in glob.glob(pattern):
       with open(path) as fin:
         # TODO(Py3): yield from
         for line in fin:
           yield line.rstrip('\n')
+      if delete:
+        os.remove(path)
 
   def test_read_write_csv(self):
     input = self.temp_dir({'1.csv': 'a,b\n1,2\n', '2.csv': 'a,b\n3,4\n'})
@@ -304,6 +313,36 @@ X     , c1, c2
       with beam.Pipeline() as p:
         _ = p | io.read_csv('/tmp/fake_dir/**')
 
+  def test_windowed_write(self):
+    output = self.temp_dir()
+    with beam.Pipeline() as p:
+      pc = (
+          p | beam.Create([MyRow(timestamp=i, value=i % 3) for i in range(20)])
+          | beam.Map(lambda v: beam.window.TimestampedValue(v, v.timestamp)).
+          with_output_types(MyRow)
+          | beam.WindowInto(
+              beam.window.FixedWindows(10)).with_output_types(MyRow))
+
+      deferred_df = convert.to_dataframe(pc)
+      deferred_df.to_csv(output + 'out.csv', index=False)
+
+    first_window_files = (
+        f'{output}out.csv-'
+        f'{datetime.utcfromtimestamp(0).isoformat()}*')
+    self.assertCountEqual(
+        ['timestamp,value'] + [f'{i},{i%3}' for i in range(10)],
+        set(self.read_all_lines(first_window_files, delete=True)))
+
+    second_window_files = (
+        f'{output}out.csv-'
+        f'{datetime.utcfromtimestamp(10).isoformat()}*')
+    self.assertCountEqual(
+        ['timestamp,value'] + [f'{i},{i%3}' for i in range(10, 20)],
+        set(self.read_all_lines(second_window_files, delete=True)))
+
+    # Check that we've read (and removed) every output file
+    self.assertEqual(len(glob.glob(f'{output}out.csv*')), 0)
+
 
 if __name__ == '__main__':
   unittest.main()