You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/02 18:15:54 UTC

[GitHub] [beam] robertwb commented on a change in pull request #13443: [BEAM-11361] Dynamic splitting of SDF IOs.

robertwb commented on a change in pull request #13443:
URL: https://github.com/apache/beam/pull/13443#discussion_r534380958



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -181,39 +193,153 @@ def expand(self, root):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), than this
+  would give a view of a 500-byte file consisting of bytes bytes 110 to 609
+  (inclusive) of the underlying file.
+
+  As with all SDF trackers, the endpoint may change dynamically during reading.
+  """
+  def __init__(
+      self,
+      underlying,
+      tracker,
+      delim=b'\n',
+      chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
+    self._underlying = underlying
+    self._tracker = tracker
+    self._buffer_start_pos = self._tracker.current_restriction().start
+    self._delim = delim
+    self._chunk_size = chunk_size
+
+    self._buffer = self._empty = self._delim[:0]
+    self._done = False
+    if self._buffer_start_pos > 0:
+      # Seek to first delimiter after the start position.
+      self._underlying.seek(self._buffer_start_pos)
+      if self.buffer_to_delim():
+        line_start = self._buffer.index(self._delim) + len(self._delim)
+        self._buffer_start_pos += line_start
+        self._buffer = self._buffer[line_start:]
+      else:
+        self._done = True
+
+  def readable(self):
+    return True
+
+  def writable(self):
+    return False
+
+  def seekable(self):
+    return False
+
+  @property
+  def closed(self):
+    return False
+
+  def __iter__(self):
+    # For pandas is_file_like.
+    raise NotImplementedError()
+
+  def buffer_to_delim(self, offset=0):
+    """Read enough of the file such that the buffer contains the delimiter, or
+    end-of-file is reached.
+    """
+    if self._delim in self._buffer[offset:]:
+      return True
+    while True:
+      chunk = self._underlying.read(self._chunk_size)
+      self._buffer += chunk
+      if self._delim in chunk:
+        return True
+      elif not chunk:
+        return False
+
+  def read(self, size=-1):
+    if self._done:
+      return self._empty
+    elif size == -1:
+      self._buffer += self._underlying.read()
+    elif not self._buffer:
+      self._buffer = self._underlying.read(size)
+
+    if self._tracker.try_claim(self._buffer_start_pos + len(self._buffer)):
+      res = self._buffer
+      self._buffer = self._empty
+      self._buffer_start_pos += len(res)
+    else:
+      offset = self._tracker.current_restriction().stop - self._buffer_start_pos
+      if self.buffer_to_delim(offset):
+        end_of_line = self._buffer.index(self._delim, offset)
+        res = self._buffer[:end_of_line + len(self._delim)]
+      else:
+        res = self._buffer
+      self._done = True
+    return res
+
+
+class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
+  def __init__(self, reader, args, kwargs, incremental, splittable, binary):
     # avoid pickling issues
     if reader.__module__.startswith('pandas.'):
       reader = reader.__name__
     self.reader = reader
     self.args = args
     self.kwargs = kwargs
     self.incremental = incremental
+    self.splittable = splittable

Review comment:
       That is correct. This relates to the comment "We could do this for all sources that are read linearly, as long as they don't try to seek." below, but for the moment we're simply not supporting progress there as it introduces additional complexity (and the progress might actually be a bit deceptive as we complete the full (typically large) read before going onto the next step).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org