You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/03/31 00:47:15 UTC
[beam] branch master updated: [BEAM-12071] Don't re-use
WriteToPandasSink instances across windows (#14374)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 911771d [BEAM-12071] Don't re-use WriteToPandasSink instances across windows (#14374)
911771d is described below
commit 911771d655a66123d7f1a45ab9ba7dc715aa99b8
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue Mar 30 17:46:24 2021 -0700
[BEAM-12071] Don't re-use WriteToPandasSink instances across windows (#14374)
* Add (failing) windowed write test
* Dont re-use pandas sink instances across windows
---
sdks/python/apache_beam/dataframe/io.py | 2 +-
sdks/python/apache_beam/dataframe/io_test.py | 41 +++++++++++++++++++++++++++-
2 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index 176e78f..2eebfaf 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -521,7 +521,7 @@ class _WriteToPandas(beam.PTransform):
return pcoll | fileio.WriteToFiles(
path=dir,
file_naming=fileio.default_file_naming(name),
- sink=_WriteToPandasFileSink(
+ sink=lambda _: _WriteToPandasFileSink(
self.writer, self.args, self.kwargs, self.incremental, self.binary))
diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py
index 9d484cc..374eb0c 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -21,7 +21,9 @@ import os
import platform
import shutil
import tempfile
+import typing
import unittest
+from datetime import datetime
from io import BytesIO
from io import StringIO
@@ -38,6 +40,11 @@ from apache_beam.io import restriction_trackers
from apache_beam.testing.util import assert_that
+class MyRow(typing.NamedTuple):
+ timestamp: int
+ value: int
+
+
@unittest.skipIf(platform.system() == 'Windows', 'BEAM-10929')
class IOTest(unittest.TestCase):
def setUp(self):
@@ -56,12 +63,14 @@ class IOTest(unittest.TestCase):
fout.write(contents)
return dir + os.path.sep
- def read_all_lines(self, pattern):
+ def read_all_lines(self, pattern, delete=False):
for path in glob.glob(pattern):
with open(path) as fin:
# TODO(Py3): yield from
for line in fin:
yield line.rstrip('\n')
+ if delete:
+ os.remove(path)
def test_read_write_csv(self):
input = self.temp_dir({'1.csv': 'a,b\n1,2\n', '2.csv': 'a,b\n3,4\n'})
@@ -304,6 +313,36 @@ X , c1, c2
with beam.Pipeline() as p:
_ = p | io.read_csv('/tmp/fake_dir/**')
+ def test_windowed_write(self):
+ output = self.temp_dir()
+ with beam.Pipeline() as p:
+ pc = (
+ p | beam.Create([MyRow(timestamp=i, value=i % 3) for i in range(20)])
+ | beam.Map(lambda v: beam.window.TimestampedValue(v, v.timestamp)).
+ with_output_types(MyRow)
+ | beam.WindowInto(
+ beam.window.FixedWindows(10)).with_output_types(MyRow))
+
+ deferred_df = convert.to_dataframe(pc)
+ deferred_df.to_csv(output + 'out.csv', index=False)
+
+ first_window_files = (
+ f'{output}out.csv-'
+ f'{datetime.utcfromtimestamp(0).isoformat()}*')
+ self.assertCountEqual(
+ ['timestamp,value'] + [f'{i},{i%3}' for i in range(10)],
+ set(self.read_all_lines(first_window_files, delete=True)))
+
+ second_window_files = (
+ f'{output}out.csv-'
+ f'{datetime.utcfromtimestamp(10).isoformat()}*')
+ self.assertCountEqual(
+ ['timestamp,value'] + [f'{i},{i%3}' for i in range(10, 20)],
+ set(self.read_all_lines(second_window_files, delete=True)))
+
+ # Check that we've read (and removed) every output file
+ self.assertEqual(len(glob.glob(f'{output}out.csv*')), 0)
+
if __name__ == '__main__':
unittest.main()