You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/15 01:46:50 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

TheNeuralBit commented on a change in pull request #13488:
URL: https://github.com/apache/beam/pull/13488#discussion_r542981589



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,133 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)

Review comment:
       Could you add docstrings and/or typehints for this class so its clear what implementations should do

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,134 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)
+
+
+class _DelimSplitter(_Splitter):
+  def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):

Review comment:
       Might be helpful to clarify that this is splitting on delimiters between _records_, it's easy to get mixed up in the CSV case where there's also a field delimiter.
   
   ```suggestion
   class _RecordDelimSplitter(_Splitter):
     """ A _Splitter that splits on delimiters between records. """
     def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
   ```

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -31,16 +33,28 @@
 _DEFAULT_BYTES_CHUNKSIZE = 1 << 20
 
 
-def read_csv(path, *args, **kwargs):
+def read_csv(path, *args, splittable=False, **kwargs):
   """Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
 
   Use this as
 
       df = p | beam.dataframe.io.read_csv(...)
 
   to get a deferred Beam dataframe representing the contents of the file.
+
+  If your files are large and records do not contain quoted newlines, you may
+  pass the extra argument splittable=True to enable dynamic splitting for this
+  read.

Review comment:
       Can we make this warn that using splittable=True with quoted newlines could lead to dataloss?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org