You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/03 05:37:25 UTC

[GitHub] [beam] chadrik opened a new pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

chadrik opened a new pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022
 
 
   In order to properly type some of the complexities around the `filesystem` API I had to address 2 main problems:
   
   - properly type `CompressedFile`
     - add `Compressor` and `Decompressor` protocols
     - make it a `BinaryIO` subclass.  this is necessary to guarantee that all the file-like things that are floating around these modules actually present the same interface.  unfortunately, this is not a protocol, so it must be subclassed.  this required a few breaking changes:
       - `writeable` renamed to `writable`
       - `closed` method changed to a property
       - `seekable` property changed a method
       - `read` arg changed name
       - `write` arg changed name
   -  add  a new `create_buffered()` classmethod to `DownloaderStream` and `UploaderStream`
      - simplifies the creation of buffered wrappers to the classes
      - this was motivated primarily by the desire to consolidate this pattern into one place so that I could handle the unfortunate need to `cast` these types to `BinaryIO`.   see notes on `DownloaderStream.create_buffered()` for why this was necessary.
   
   --- 
   
   This PR makes one significant runtime change that must be vetted.
   
   `s3io.S3IO.open()` originally created a buffered `DownloaderStream` like this:
   
   ```python
         return io.BufferedReader(
             DownloaderStream(downloader, mode=mode), 
             buffer_size=read_buffer_size)
   ```
   
   All other IO clients pass `read_buffer_size` to `DownloaderStream`, like this example from `gcsio`:
   
   ```python
         return io.BufferedReader(
             DownloaderStream(downloader, read_buffer_size=read_buffer_size, mode=mode),
             buffer_size=read_buffer_size)
   ```
   
   I was hoping to standardize on that behavior in order to simplify `create_buffered`, but I don't know whether this difference was intentional or not, or what the impact of changing it would be. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#discussion_r386809980
 
 

 ##########
 File path: sdks/python/apache_beam/io/localfilesystem.py
 ##########
 @@ -136,10 +137,17 @@ def _path_open(
       mode,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO):
+    # type: (...) -> BinaryIO
+
     """Helper functions to open a file in the provided mode.
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
-    raw_file = open(path, mode)
+    if mode == 'r':
+      mode = 'rb'
+    elif mode == 'w':
+      mode = 'wb'
 
 Review comment:
   I saw this pattern another filesystem so I thought I'd add it here to make it completely obvious that the subsequent cast to `BinaryIO` (from I`O[Any]`) is safe and warranted. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#discussion_r386809980
 
 

 ##########
 File path: sdks/python/apache_beam/io/localfilesystem.py
 ##########
 @@ -136,10 +137,17 @@ def _path_open(
       mode,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO):
+    # type: (...) -> BinaryIO
+
     """Helper functions to open a file in the provided mode.
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
-    raw_file = open(path, mode)
+    if mode == 'r':
+      mode = 'rb'
+    elif mode == 'w':
+      mode = 'wb'
 
 Review comment:
   I saw this pattern in another filesystem so I thought I'd add it here to make it completely obvious that the subsequent cast to `BinaryIO` (from `IO[Any]`) is safe and warranted. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#issuecomment-594143027
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#discussion_r387300879
 
 

 ##########
 File path: sdks/python/apache_beam/io/filesystemio.py
 ##########
 @@ -175,10 +190,31 @@ def readall(self):
       res.append(data)
     return b''.join(res)
 
+  @classmethod
+  def create_buffered(cls,  # type: Type[DownloaderStreamT]
+                      downloader,  # type: Downloader
+                      read_buffer_size=io.DEFAULT_BUFFER_SIZE, mode='rb'):
+    # type: (...) -> BinaryIO
+    # In the python3 typeshed, io.BufferedReader and io.BufferedWriter do not
+    # inherit from typing.BinaryIO, and BinaryIO is not a Protocol, so a class
+    # must inherit from it.  We could roll our own BinaryIO Protocol, but the
+    # stubs for io.Buffered* do not have the required 'mode' or 'name' attrs to
+    # meet the protocol (the classes seem to expose 'mode' and 'name'
+    # conditionally, if their underlying io.RawIOBase possess the attributes).
+    # Thus, it is necessary to cast these types to BinaryIO either way.
+    return cast(
+        BinaryIO,
+        io.BufferedReader(
+            cls(downloader, read_buffer_size=read_buffer_size, mode=mode),
+            buffer_size=read_buffer_size))
 
 Review comment:
   should I create separate args for the buffer size of the `DownloaderStream` and the `io.BufferedReader`?   The treatment of these varies between filesystems. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#issuecomment-594143930
 
 
   The lint job is failing on a file that doesn't exit...
   
   ```
   ************* Module apache_beam.runners.interactive.utils
   apache_beam/runners/interactive/utils.py:25:0: W0611: Unused WindowedValue imported from apache_beam.utils.windowed_value (unused-import) 
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#discussion_r386809980
 
 

 ##########
 File path: sdks/python/apache_beam/io/localfilesystem.py
 ##########
 @@ -136,10 +137,17 @@ def _path_open(
       mode,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO):
+    # type: (...) -> BinaryIO
+
     """Helper functions to open a file in the provided mode.
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
-    raw_file = open(path, mode)
+    if mode == 'r':
+      mode = 'rb'
+    elif mode == 'w':
+      mode = 'wb'
 
 Review comment:
   I saw this pattern in another filesystem so I thought I'd add it here to make it completely obvious that the subsequent cast to `BinaryIO` (from I`O[Any]`) is safe and warranted. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on a change in pull request #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#discussion_r386809980
 
 

 ##########
 File path: sdks/python/apache_beam/io/localfilesystem.py
 ##########
 @@ -136,10 +137,17 @@ def _path_open(
       mode,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO):
+    # type: (...) -> BinaryIO
+
     """Helper functions to open a file in the provided mode.
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
-    raw_file = open(path, mode)
+    if mode == 'r':
+      mode = 'rb'
+    elif mode == 'w':
+      mode = 'wb'
 
 Review comment:
   I saw this pattern in other filesystems so I thought I'd add it here to make it completely obvious that the subsequent cast to `BinaryIO` (from `IO[Any]`) is safe and warranted. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#issuecomment-593803091
 
 
   R: @robertwb 
   R: @udim 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem

Posted by GitBox <gi...@apache.org>.
chadrik commented on issue #11022: [BEAM-7746] Resolve typing issues in filesystem
URL: https://github.com/apache/beam/pull/11022#issuecomment-594101492
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services