You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2021/12/13 23:59:36 UTC
[beam] branch master updated: [BEAM-13454] Fix and test dataframe read_fwf. (#16064)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new da11b84 [BEAM-13454] Fix and test dataframe read_fwf. (#16064)
da11b84 is described below
commit da11b84ba29f75a5764826f465b5bca9fad72a0e
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Mon Dec 13 15:58:37 2021 -0800
[BEAM-13454] Fix and test dataframe read_fwf. (#16064)
---
sdks/python/apache_beam/dataframe/io.py | 47 +++++++++++++++++++++++++---
sdks/python/apache_beam/dataframe/io_test.py | 29 ++++++++++++++++-
2 files changed, 71 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index f3d436e..ab315f1 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -72,7 +72,7 @@ def read_csv(path, *args, splittable=False, **kwargs):
args,
kwargs,
incremental=True,
- splitter=_CsvSplitter(args, kwargs) if splittable else None)
+ splitter=_TextFileSplitter(args, kwargs) if splittable else None)
def _as_pc(df, label=None):
@@ -93,7 +93,14 @@ def to_csv(df, path, transform_label=None, *args, **kwargs):
@frame_base.with_docs_from(pd)
def read_fwf(path, *args, **kwargs):
- return _ReadFromPandas(pd.read_fwf, path, args, kwargs, incremental=True)
+ return _ReadFromPandas(
+ pd.read_fwf,
+ path,
+ args,
+ kwargs,
+ incremental=True,
+ binary=False,
+ splitter=_TextFileSplitter(args, kwargs))
@frame_base.with_docs_from(pd)
@@ -350,7 +357,7 @@ def _maybe_encode(str_or_bytes):
return str_or_bytes
-class _CsvSplitter(_DelimSplitter):
+class _TextFileSplitter(_DelimSplitter):
"""Splitter for dynamically sharding CSV files and newline record boundaries.
Currently does not handle quoted newlines, so is off by default, but such
@@ -442,6 +449,7 @@ class _TruncatingFileHandle(object):
self._done = False
self._header, self._buffer = self._splitter.read_header(self._underlying)
self._buffer_start_pos = len(self._header)
+ self._iterator = None
start = self._tracker.current_restriction().start
# Seek to first delimiter after the start position.
if start > len(self._header):
@@ -471,9 +479,40 @@ class _TruncatingFileHandle(object):
def __iter__(self):
# For pandas is_file_like.
- raise NotImplementedError()
+ return self
+
+ def __next__(self):
+ if self._iterator is None:
+ self._iterator = self._line_iterator()
+ return next(self._iterator)
+
+ def readline(self):
+ # This attribute is checked, but unused, by pandas.
+ return next(self)
+
+ def _line_iterator(self):
+ line_start = 0
+ chunk = self._read()
+ while True:
+ line_end = chunk.find(self._splitter._delim, line_start)
+ while line_end == -1:
+ more = self._read()
+ if not more:
+ if line_start < len(chunk):
+ yield chunk[line_start:]
+ return
+ chunk = chunk[line_start:] + more
+ line_start = 0
+ line_end = chunk.find(self._splitter._delim, line_start)
+ yield chunk[line_start:line_end + 1]
+ line_start = line_end + 1
def read(self, size=-1):
+ if self._iterator:
+ raise NotImplementedError('Cannot call read after iterating.')
+ return self._read(size)
+
+ def _read(self, size=-1):
if self._header:
res = self._header
self._header = None
diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py
index d525b40..681642c 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -38,6 +38,7 @@ from apache_beam.dataframe import convert
from apache_beam.dataframe import io
from apache_beam.io import restriction_trackers
from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class SimpleRow(typing.NamedTuple):
@@ -76,6 +77,19 @@ class IOTest(unittest.TestCase):
if delete:
os.remove(path)
+ def test_read_fwf(self):
+ input = self.temp_dir(
+ {'all.fwf': '''
+A B
+11a 0
+37a 1
+389a 2
+ '''.strip()})
+ with beam.Pipeline() as p:
+ df = p | io.read_fwf(input + 'all.fwf')
+ rows = convert.to_pcollection(df) | beam.Map(tuple)
+ assert_that(rows, equal_to([('11a', 0), ('37a', 1), ('389a', 2)]))
+
def test_read_write_csv(self):
input = self.temp_dir({'1.csv': 'a,b\n1,2\n', '2.csv': 'a,b\n3,4\n'})
output = self.temp_dir()
@@ -229,6 +243,19 @@ class IOTest(unittest.TestCase):
self.assertGreater(
min(len(s) for s in splits), len(numbers) * 0.9**20 * 0.1)
+ def _run_truncating_file_handle_iter_test(self, s, delim=' ', chunk_size=10):
+ tracker = restriction_trackers.OffsetRestrictionTracker(
+ restriction_trackers.OffsetRange(0, len(s)))
+ handle = io._TruncatingFileHandle(
+ StringIO(s), tracker, splitter=io._DelimSplitter(delim, chunk_size))
+ self.assertEqual(s, ''.join(list(handle)))
+
+ def test_truncating_filehandle_iter(self):
+ self._run_truncating_file_handle_iter_test('a b c')
+ self._run_truncating_file_handle_iter_test('aaaaaaaaaaaaaaaaaaaa b ccc')
+ self._run_truncating_file_handle_iter_test('aaa b cccccccccccccccccccc')
+ self._run_truncating_file_handle_iter_test('aaa b ccccccccccccccccc ')
+
@parameterized.expand([
('defaults', {}),
('header', dict(header=1)),
@@ -259,7 +286,7 @@ class IOTest(unittest.TestCase):
BytesIO(contents.encode('ascii')),
restriction_trackers.OffsetRestrictionTracker(
restriction_trackers.OffsetRange(start, stop)),
- splitter=io._CsvSplitter((), kwargs, read_chunk_size=7)),
+ splitter=io._TextFileSplitter((), kwargs, read_chunk_size=7)),
index_col=0,
**kwargs)