You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/01 22:50:42 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #13443: [BEAM-11361] Dynamic splitting of SDF IOs.

boyuanzz commented on a change in pull request #13443:
URL: https://github.com/apache/beam/pull/13443#discussion_r533773617



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -181,39 +193,153 @@ def expand(self, root):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), than this
+  would give a view of a 500-byte file consisting of bytes bytes 110 to 609
+  (inclusive) of the underlying file.
+
+  As with all SDF trackers, the endpoint may change dynamically during reading.
+  """
+  def __init__(
+      self,
+      underlying,
+      tracker,
+      delim=b'\n',
+      chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
+    self._underlying = underlying
+    self._tracker = tracker
+    self._buffer_start_pos = self._tracker.current_restriction().start
+    self._delim = delim
+    self._chunk_size = chunk_size
+
+    self._buffer = self._empty = self._delim[:0]
+    self._done = False
+    if self._buffer_start_pos > 0:
+      # Seek to first delimiter after the start position.
+      self._underlying.seek(self._buffer_start_pos)
+      if self.buffer_to_delim():
+        line_start = self._buffer.index(self._delim) + len(self._delim)
+        self._buffer_start_pos += line_start
+        self._buffer = self._buffer[line_start:]
+      else:
+        self._done = True
+
+  def readable(self):
+    return True
+
+  def writable(self):
+    return False
+
+  def seekable(self):
+    return False
+
+  @property
+  def closed(self):
+    return False
+
+  def __iter__(self):
+    # For pandas is_file_like.
+    raise NotImplementedError()
+
+  def buffer_to_delim(self, offset=0):
+    """Read enough of the file such that the buffer contains the delimiter, or
+    end-of-file is reached.
+    """
+    if self._delim in self._buffer[offset:]:
+      return True
+    while True:
+      chunk = self._underlying.read(self._chunk_size)
+      self._buffer += chunk
+      if self._delim in chunk:
+        return True
+      elif not chunk:
+        return False
+
+  def read(self, size=-1):
+    if self._done:
+      return self._empty
+    elif size == -1:
+      self._buffer += self._underlying.read()
+    elif not self._buffer:
+      self._buffer = self._underlying.read(size)
+
+    if self._tracker.try_claim(self._buffer_start_pos + len(self._buffer)):
+      res = self._buffer
+      self._buffer = self._empty
+      self._buffer_start_pos += len(res)
+    else:
+      offset = self._tracker.current_restriction().stop - self._buffer_start_pos
+      if self.buffer_to_delim(offset):
+        end_of_line = self._buffer.index(self._delim, offset)
+        res = self._buffer[:end_of_line + len(self._delim)]
+      else:
+        res = self._buffer
+      self._done = True
+    return res
+
+
+class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
+  def __init__(self, reader, args, kwargs, incremental, splittable, binary):
     # avoid pickling issues
     if reader.__module__.startswith('pandas.'):
       reader = reader.__name__
     self.reader = reader
     self.args = args
     self.kwargs = kwargs
     self.incremental = incremental
+    self.splittable = splittable

Review comment:
       I want to make sure I understand how `incremental` and `splittable` work together:
   * If `incremental == True` and `splittable == True`, the reading is both splittable and we will have progress on that.
   * If `incremental == True` and `splittable == False`, the reading is not splittable but we will have progress on that.
   * If `incremental ==False` and `splittable ==True`, the reading is not splittable and no progress on it.
   * If `incremental ==False` and `splittable == False`, the reading is not splittable and no progress on it.
   
   Does it correct?

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -181,39 +193,153 @@ def expand(self, root):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), than this

Review comment:
       ```suggestion
     exactly 10 characters long (i.e. every 10th charcter was a newline), then this
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org