You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/04 18:54:28 UTC

[GitHub] [beam] robertwb opened a new pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

robertwb opened a new pull request #13488:
URL: https://github.com/apache/beam/pull/13488


   Support for headers, but not for escaped newlines (so not yet on by default).
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #13488:
URL: https://github.com/apache/beam/pull/13488#issuecomment-738958324


   R: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #13488:
URL: https://github.com/apache/beam/pull/13488


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13488:
URL: https://github.com/apache/beam/pull/13488#discussion_r542981589



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,133 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)

Review comment:
       Could you add docstrings and/or typehints for this class so its clear what implementations should do

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,134 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)
+
+
+class _DelimSplitter(_Splitter):
+  def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):

Review comment:
       Might be helpful to clarify that this is splitting on delimiters between _records_, it's easy to get mixed up in the CSV case where there's also a field delimiter.
   
   ```suggestion
   class _RecordDelimSplitter(_Splitter):
     """ A _Splitter that splits on delimiters between records. """
     def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
   ```

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -31,16 +33,28 @@
 _DEFAULT_BYTES_CHUNKSIZE = 1 << 20
 
 
-def read_csv(path, *args, **kwargs):
+def read_csv(path, *args, splittable=False, **kwargs):
   """Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
 
   Use this as
 
       df = p | beam.dataframe.io.read_csv(...)
 
   to get a deferred Beam dataframe representing the contents of the file.
+
+  If your files are large and records do not contain quoted newlines, you may
+  pass the extra argument splittable=True to enable dynamic splitting for this
+  read.

Review comment:
       Can we make this warn that using splittable=True with quoted newlines could lead to dataloss?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13488: [BEAM-11361] Dynamic splitting of dataframe csv reads.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13488:
URL: https://github.com/apache/beam/pull/13488#discussion_r543754989



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -31,16 +33,28 @@
 _DEFAULT_BYTES_CHUNKSIZE = 1 << 20
 
 
-def read_csv(path, *args, **kwargs):
+def read_csv(path, *args, splittable=False, **kwargs):
   """Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
 
   Use this as
 
       df = p | beam.dataframe.io.read_csv(...)
 
   to get a deferred Beam dataframe representing the contents of the file.
+
+  If your files are large and records do not contain quoted newlines, you may
+  pass the extra argument splittable=True to enable dynamic splitting for this
+  read.

Review comment:
       Good point. Done.

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,134 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)
+
+
+class _DelimSplitter(_Splitter):
+  def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,133 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)

Review comment:
       I actually tried to add typehints initially; they work poorly due to the fact that this works both on bytes and strings (but it's hard for mypy to know that it will be all one or all the other). But I added some comments at least. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org