You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2021/12/13 23:59:36 UTC

[beam] branch master updated: [BEAM-13454] Fix and test dataframe read_fwf. (#16064)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new da11b84  [BEAM-13454] Fix and test dataframe read_fwf. (#16064)
da11b84 is described below

commit da11b84ba29f75a5764826f465b5bca9fad72a0e
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Mon Dec 13 15:58:37 2021 -0800

    [BEAM-13454] Fix and test dataframe read_fwf. (#16064)
---
 sdks/python/apache_beam/dataframe/io.py      | 47 +++++++++++++++++++++++++---
 sdks/python/apache_beam/dataframe/io_test.py | 29 ++++++++++++++++-
 2 files changed, 71 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index f3d436e..ab315f1 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -72,7 +72,7 @@ def read_csv(path, *args, splittable=False, **kwargs):
       args,
       kwargs,
       incremental=True,
-      splitter=_CsvSplitter(args, kwargs) if splittable else None)
+      splitter=_TextFileSplitter(args, kwargs) if splittable else None)
 
 
 def _as_pc(df, label=None):
@@ -93,7 +93,14 @@ def to_csv(df, path, transform_label=None, *args, **kwargs):
 
 @frame_base.with_docs_from(pd)
 def read_fwf(path, *args, **kwargs):
-  return _ReadFromPandas(pd.read_fwf, path, args, kwargs, incremental=True)
+  return _ReadFromPandas(
+      pd.read_fwf,
+      path,
+      args,
+      kwargs,
+      incremental=True,
+      binary=False,
+      splitter=_TextFileSplitter(args, kwargs))
 
 
 @frame_base.with_docs_from(pd)
@@ -350,7 +357,7 @@ def _maybe_encode(str_or_bytes):
     return str_or_bytes
 
 
-class _CsvSplitter(_DelimSplitter):
+class _TextFileSplitter(_DelimSplitter):
   """Splitter for dynamically sharding CSV files and newline record boundaries.
 
   Currently does not handle quoted newlines, so is off by default, but such
@@ -442,6 +449,7 @@ class _TruncatingFileHandle(object):
     self._done = False
     self._header, self._buffer = self._splitter.read_header(self._underlying)
     self._buffer_start_pos = len(self._header)
+    self._iterator = None
     start = self._tracker.current_restriction().start
     # Seek to first delimiter after the start position.
     if start > len(self._header):
@@ -471,9 +479,40 @@ class _TruncatingFileHandle(object):
 
   def __iter__(self):
     # For pandas is_file_like.
-    raise NotImplementedError()
+    return self
+
+  def __next__(self):
+    if self._iterator is None:
+      self._iterator = self._line_iterator()
+    return next(self._iterator)
+
+  def readline(self):
+    # This attribute is checked, but unused, by pandas.
+    return next(self)
+
+  def _line_iterator(self):
+    line_start = 0
+    chunk = self._read()
+    while True:
+      line_end = chunk.find(self._splitter._delim, line_start)
+      while line_end == -1:
+        more = self._read()
+        if not more:
+          if line_start < len(chunk):
+            yield chunk[line_start:]
+          return
+        chunk = chunk[line_start:] + more
+        line_start = 0
+        line_end = chunk.find(self._splitter._delim, line_start)
+      yield chunk[line_start:line_end + 1]
+      line_start = line_end + 1
 
   def read(self, size=-1):
+    if self._iterator:
+      raise NotImplementedError('Cannot call read after iterating.')
+    return self._read(size)
+
+  def _read(self, size=-1):
     if self._header:
       res = self._header
       self._header = None
diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py
index d525b40..681642c 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -38,6 +38,7 @@ from apache_beam.dataframe import convert
 from apache_beam.dataframe import io
 from apache_beam.io import restriction_trackers
 from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class SimpleRow(typing.NamedTuple):
@@ -76,6 +77,19 @@ class IOTest(unittest.TestCase):
       if delete:
         os.remove(path)
 
+  def test_read_fwf(self):
+    input = self.temp_dir(
+        {'all.fwf': '''
+A     B
+11a   0
+37a   1
+389a  2
+    '''.strip()})
+    with beam.Pipeline() as p:
+      df = p | io.read_fwf(input + 'all.fwf')
+      rows = convert.to_pcollection(df) | beam.Map(tuple)
+      assert_that(rows, equal_to([('11a', 0), ('37a', 1), ('389a', 2)]))
+
   def test_read_write_csv(self):
     input = self.temp_dir({'1.csv': 'a,b\n1,2\n', '2.csv': 'a,b\n3,4\n'})
     output = self.temp_dir()
@@ -229,6 +243,19 @@ class IOTest(unittest.TestCase):
     self.assertGreater(
         min(len(s) for s in splits), len(numbers) * 0.9**20 * 0.1)
 
+  def _run_truncating_file_handle_iter_test(self, s, delim=' ', chunk_size=10):
+    tracker = restriction_trackers.OffsetRestrictionTracker(
+        restriction_trackers.OffsetRange(0, len(s)))
+    handle = io._TruncatingFileHandle(
+        StringIO(s), tracker, splitter=io._DelimSplitter(delim, chunk_size))
+    self.assertEqual(s, ''.join(list(handle)))
+
+  def test_truncating_filehandle_iter(self):
+    self._run_truncating_file_handle_iter_test('a b c')
+    self._run_truncating_file_handle_iter_test('aaaaaaaaaaaaaaaaaaaa b ccc')
+    self._run_truncating_file_handle_iter_test('aaa b cccccccccccccccccccc')
+    self._run_truncating_file_handle_iter_test('aaa b ccccccccccccccccc ')
+
   @parameterized.expand([
       ('defaults', {}),
       ('header', dict(header=1)),
@@ -259,7 +286,7 @@ class IOTest(unittest.TestCase):
               BytesIO(contents.encode('ascii')),
               restriction_trackers.OffsetRestrictionTracker(
                   restriction_trackers.OffsetRange(start, stop)),
-              splitter=io._CsvSplitter((), kwargs, read_chunk_size=7)),
+              splitter=io._TextFileSplitter((), kwargs, read_chunk_size=7)),
           index_col=0,
           **kwargs)