You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2022/09/19 13:37:01 UTC

[iceberg] branch master updated: Python: PyArrow support for S3/S3A with properties (#5747)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f8aecf6c34 Python: PyArrow support for S3/S3A with properties (#5747)
f8aecf6c34 is described below

commit f8aecf6c3499d9fe08abebcf08b44fe5b4d1534f
Author: Joshua Robinson <33...@users.noreply.github.com>
AuthorDate: Mon Sep 19 15:36:54 2022 +0200

    Python: PyArrow support for S3/S3A with properties (#5747)
---
 python/pyiceberg/io/pyarrow.py  | 49 +++++++++++++++++++++-------
 python/tests/io/test_io.py      | 56 ++++++++++++++++++--------------
 python/tests/io/test_pyarrow.py | 71 ++++++++++++++++-------------------------
 3 files changed, 96 insertions(+), 80 deletions(-)

diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index c07a1118aa..dd82bbfb77 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -23,10 +23,16 @@ with the pyarrow library.
 """
 
 import os
-from typing import Union
+from functools import lru_cache
+from typing import Callable, Tuple, Union
 from urllib.parse import urlparse
 
-from pyarrow.fs import FileInfo, FileSystem, FileType
+from pyarrow.fs import (
+    FileInfo,
+    FileSystem,
+    FileType,
+    S3FileSystem,
+)
 
 from pyiceberg.io import (
     FileIO,
@@ -35,6 +41,7 @@ from pyiceberg.io import (
     OutputFile,
     OutputStream,
 )
+from pyiceberg.typedef import EMPTY_DICT, Properties
 
 
 class PyArrowFile(InputFile, OutputFile):
@@ -59,12 +66,9 @@ class PyArrowFile(InputFile, OutputFile):
         >>> # output_file.create().write(b'foobytes')
     """
 
-    def __init__(self, location: str):
-        parsed_location = urlparse(location)  # Create a ParseResult from the URI
-        if not parsed_location.scheme:  # If no scheme, assume the path is to a local file
-            self._filesystem, self._path = FileSystem.from_uri(os.path.abspath(location))
-        else:
-            self._filesystem, self._path = FileSystem.from_uri(location)  # Infer the proper filesystem
+    def __init__(self, location: str, path: str, fs: FileSystem):
+        self._filesystem = fs
+        self._path = path
         super().__init__(location=location)
 
     def _file_info(self) -> FileInfo:
@@ -165,6 +169,24 @@ class PyArrowFile(InputFile, OutputFile):
 
 
 class PyArrowFileIO(FileIO):
+    def __init__(self, properties: Properties = EMPTY_DICT):
+        self.get_fs_and_path: Callable = lru_cache(self._get_fs_and_path)
+        super().__init__(properties=properties)
+
+    def _get_fs_and_path(self, location: str) -> Tuple[FileSystem, str]:
+        uri = urlparse(location)  # Create a ParseResult from the URI
+        if not uri.scheme:  # If no scheme, assume the path is to a local file
+            return FileSystem.from_uri(os.path.abspath(location))
+        elif uri.scheme in {"s3", "s3a", "s3n"}:
+            client_kwargs = {
+                "endpoint_override": self.properties.get("s3.endpoint"),
+                "access_key": self.properties.get("s3.access-key-id"),
+                "secret_key": self.properties.get("s3.secret-access-key"),
+            }
+            return (S3FileSystem(**client_kwargs), uri.netloc + uri.path)
+        else:
+            return FileSystem.from_uri(location)  # Infer the proper filesystem
+
     def new_input(self, location: str) -> PyArrowFile:
         """Get a PyArrowFile instance to read bytes from the file at the given location
 
@@ -174,7 +196,8 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location
         """
-        return PyArrowFile(location)
+        fs, path = self.get_fs_and_path(location)
+        return PyArrowFile(fs=fs, location=location, path=path)
 
     def new_output(self, location: str) -> PyArrowFile:
         """Get a PyArrowFile instance to write bytes to the file at the given location
@@ -185,7 +208,8 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location
         """
-        return PyArrowFile(location)
+        fs, path = self.get_fs_and_path(location)
+        return PyArrowFile(fs=fs, location=location, path=path)
 
     def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
         """Delete the file at the given location
@@ -201,9 +225,10 @@ class PyArrowFileIO(FileIO):
                 an AWS error code 15
         """
         str_path = location.location if isinstance(location, (InputFile, OutputFile)) else location
-        filesystem, path = FileSystem.from_uri(str_path)  # Infer the proper filesystem
+        fs, path = self.get_fs_and_path(str_path)
+
         try:
-            filesystem.delete_file(path)
+            fs.delete_file(path)
         except FileNotFoundError:
             raise
         except PermissionError:
diff --git a/python/tests/io/test_io.py b/python/tests/io/test_io.py
index 0e67870b54..fe04b76f7e 100644
--- a/python/tests/io/test_io.py
+++ b/python/tests/io/test_io.py
@@ -35,7 +35,7 @@ from pyiceberg.io import (
     load_file_io,
 )
 from pyiceberg.io.fsspec import FsspecFileIO
-from pyiceberg.io.pyarrow import PyArrowFile, PyArrowFileIO
+from pyiceberg.io.pyarrow import PyArrowFileIO
 
 
 class LocalInputFile(InputFile):
@@ -133,8 +133,8 @@ class LocalFileIO(FileIO):
             raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path}") from e
 
 
-@pytest.mark.parametrize("CustomInputFile", [LocalInputFile, PyArrowFile])
-def test_custom_local_input_file(CustomInputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_input_file(CustomFileIO):
     """Test initializing an InputFile implementation to read a local file"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         file_location = os.path.join(tmpdirname, "foo.txt")
@@ -146,7 +146,7 @@ def test_custom_local_input_file(CustomInputFile):
 
         # Instantiate the input file
         absolute_file_location = os.path.abspath(file_location)
-        input_file = CustomInputFile(location=f"{absolute_file_location}")
+        input_file = CustomFileIO().new_input(location=f"{absolute_file_location}")
 
         # Test opening and reading the file
         f = input_file.open()
@@ -155,15 +155,15 @@ def test_custom_local_input_file(CustomInputFile):
         assert len(input_file) == 3
 
 
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_custom_local_output_file(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_output_file(CustomFileIO):
     """Test initializing an OutputFile implementation to write to a local file"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         file_location = os.path.join(tmpdirname, "foo.txt")
 
         # Instantiate the output file
         absolute_file_location = os.path.abspath(file_location)
-        output_file = CustomOutputFile(location=f"{absolute_file_location}")
+        output_file = CustomFileIO().new_output(location=f"{absolute_file_location}")
 
         # Create the output file and write to it
         f = output_file.create()
@@ -176,8 +176,8 @@ def test_custom_local_output_file(CustomOutputFile):
         assert len(output_file) == 3
 
 
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_custom_local_output_file_with_overwrite(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_output_file_with_overwrite(CustomFileIO):
     """Test initializing an OutputFile implementation to overwrite a local file"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         output_file_location = os.path.join(tmpdirname, "foo.txt")
@@ -187,7 +187,7 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
             f.write(b"foo")
 
         # Instantiate an output file
-        output_file = CustomOutputFile(location=f"{output_file_location}")
+        output_file = CustomFileIO().new_output(location=f"{output_file_location}")
 
         # Confirm that a FileExistsError is raised when overwrite=False
         with pytest.raises(FileExistsError):
@@ -201,8 +201,8 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
             assert f.read() == b"bar"
 
 
-@pytest.mark.parametrize("CustomFile", [LocalInputFile, LocalOutputFile, PyArrowFile, PyArrowFile])
-def test_custom_file_exists(CustomFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_file_exists(CustomFileIO):
     """Test that the exists property returns the proper value for existing and non-existing files"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         file_location = os.path.join(tmpdirname, "foo.txt")
@@ -218,23 +218,31 @@ def test_custom_file_exists(CustomFile):
         absolute_file_location = os.path.abspath(file_location)
         non_existent_absolute_file_location = os.path.abspath(nonexistent_file_location)
 
-        # Create File instances
-        file = CustomFile(location=f"{absolute_file_location}")
-        non_existent_file = CustomFile(location=f"{non_existent_absolute_file_location}")
+        # Create InputFile instances
+        file = CustomFileIO().new_input(location=f"{absolute_file_location}")
+        non_existent_file = CustomFileIO().new_input(location=f"{non_existent_absolute_file_location}")
+
+        # Test opening and reading the file
+        assert file.exists()
+        assert not non_existent_file.exists()
+
+        # Create OutputFile instances
+        file = CustomFileIO().new_output(location=f"{absolute_file_location}")
+        non_existent_file = CustomFileIO().new_output(location=f"{non_existent_absolute_file_location}")
 
         # Test opening and reading the file
         assert file.exists()
         assert not non_existent_file.exists()
 
 
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_output_file_to_input_file(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_output_file_to_input_file(CustomFileIO):
     """Test initializing an InputFile using the `to_input_file()` method on an OutputFile instance"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         output_file_location = os.path.join(tmpdirname, "foo.txt")
 
         # Create an output file instance
-        output_file = CustomOutputFile(location=f"{output_file_location}")
+        output_file = CustomFileIO().new_output(location=f"{output_file_location}")
 
         # Create the output file and write to it
         f = output_file.create()
@@ -334,8 +342,8 @@ def test_raise_file_not_found_error_for_fileio_delete(CustomFileIO):
         assert not os.path.exists(output_file_location)
 
 
-@pytest.mark.parametrize("CustomFileIO, CustomInputFile", [(LocalFileIO, LocalInputFile), (PyArrowFileIO, PyArrowFile)])
-def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_deleting_local_file_using_file_io_input_file(CustomFileIO):
     """Test deleting a local file by passing an InputFile instance to FileIO.delete(...)"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         # Write to the temporary file
@@ -350,7 +358,7 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
         assert os.path.exists(file_location)
 
         # Instantiate the custom InputFile
-        input_file = CustomInputFile(location=f"{file_location}")
+        input_file = CustomFileIO().new_input(location=f"{file_location}")
 
         # Delete the file using the file-io implementations delete method
         file_io.delete(input_file)
@@ -359,8 +367,8 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
         assert not os.path.exists(file_location)
 
 
-@pytest.mark.parametrize("CustomFileIO, CustomOutputFile", [(LocalFileIO, LocalOutputFile), (PyArrowFileIO, PyArrowFile)])
-def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_deleting_local_file_using_file_io_output_file(CustomFileIO):
     """Test deleting a local file by passing an OutputFile instance to FileIO.delete(...)"""
     with tempfile.TemporaryDirectory() as tmpdirname:
         # Write to the temporary file
@@ -375,7 +383,7 @@ def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutpu
         assert os.path.exists(file_location)
 
         # Instantiate the custom OutputFile
-        output_file = CustomOutputFile(location=f"{file_location}")
+        output_file = CustomFileIO().new_output(location=f"{file_location}")
 
         # Delete the file using the file-io implementations delete method
         file_io.delete(output_file)
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 43dcd22a8d..1b0b07f671 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -40,7 +40,7 @@ def test_pyarrow_input_file():
 
         # Instantiate the input file
         absolute_file_location = os.path.abspath(file_location)
-        input_file = PyArrowFile(location=f"{absolute_file_location}")
+        input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}")
 
         # Test opening and reading the file
         f = input_file.open()
@@ -58,7 +58,7 @@ def test_pyarrow_output_file():
 
         # Instantiate the output file
         absolute_file_location = os.path.abspath(file_location)
-        output_file = PyArrowFile(location=f"{absolute_file_location}")
+        output_file = PyArrowFileIO().new_output(location=f"{absolute_file_location}")
 
         # Create the output file and write to it
         f = output_file.create()
@@ -76,12 +76,12 @@ def test_pyarrow_invalid_scheme():
     """Test that a ValueError is raised if a location is provided with an invalid scheme"""
 
     with pytest.raises(ValueError) as exc_info:
-        PyArrowFile("foo://bar/baz.txt")
+        PyArrowFileIO().new_input("foo://bar/baz.txt")
 
     assert ("Unrecognized filesystem type in URI") in str(exc_info.value)
 
     with pytest.raises(ValueError) as exc_info:
-        PyArrowFile("foo://bar/baz.txt")
+        PyArrowFileIO().new_output("foo://bar/baz.txt")
 
     assert ("Unrecognized filesystem type in URI") in str(exc_info.value)
 
@@ -96,8 +96,7 @@ def test_pyarrow_violating_input_stream_protocol():
     filesystem_mock = MagicMock()
     filesystem_mock.open_input_file.return_value = input_file_mock
 
-    input_file = PyArrowFile("foo.txt")
-    input_file._filesystem = filesystem_mock
+    input_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock)
 
     f = input_file.open()
     assert not isinstance(f, InputStream)
@@ -118,8 +117,7 @@ def test_pyarrow_violating_output_stream_protocol():
     filesystem_mock.open_output_stream.return_value = output_file_mock
     filesystem_mock.get_file_info.return_value = file_info_mock
 
-    output_file = PyArrowFile("foo.txt")
-    output_file._filesystem = filesystem_mock
+    output_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock)
 
     f = output_file.create()
 
@@ -131,7 +129,7 @@ def test_raise_on_opening_a_local_file_not_found():
 
     with tempfile.TemporaryDirectory() as tmpdirname:
         file_location = os.path.join(tmpdirname, "foo.txt")
-        f = PyArrowFile(file_location)
+        f = PyArrowFileIO().new_input(file_location)
 
         with pytest.raises(FileNotFoundError) as exc_info:
             f.open()
@@ -145,7 +143,7 @@ def test_raise_on_opening_a_local_file_no_permission():
     with tempfile.TemporaryDirectory() as tmpdirname:
         os.chmod(tmpdirname, 0o600)
         file_location = os.path.join(tmpdirname, "foo.txt")
-        f = PyArrowFile(file_location)
+        f = PyArrowFileIO().new_input(file_location)
 
         with pytest.raises(PermissionError) as exc_info:
             f.open()
@@ -159,7 +157,7 @@ def test_raise_on_checking_if_local_file_exists_no_permission():
     with tempfile.TemporaryDirectory() as tmpdirname:
         os.chmod(tmpdirname, 0o600)
         file_location = os.path.join(tmpdirname, "foo.txt")
-        f = PyArrowFile(file_location)
+        f = PyArrowFileIO().new_input(file_location)
 
         with pytest.raises(PermissionError) as exc_info:
             f.create()
@@ -173,7 +171,7 @@ def test_raise_on_creating_a_local_file_no_permission():
     with tempfile.TemporaryDirectory() as tmpdirname:
         os.chmod(tmpdirname, 0o600)
         file_location = os.path.join(tmpdirname, "foo.txt")
-        f = PyArrowFile(file_location)
+        f = PyArrowFileIO().new_input(file_location)
 
         with pytest.raises(PermissionError) as exc_info:
             f.create()
@@ -195,17 +193,13 @@ def test_raise_on_delete_file_with_no_permission():
         assert "Cannot delete file" in str(exc_info.value)
 
 
-@patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_opening_an_s3_file_no_permission(filesystem_mock, exists_mock):
+def test_raise_on_opening_an_s3_file_no_permission():
     """Test that opening a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
 
     s3fs_mock = MagicMock()
     s3fs_mock.open_input_file.side_effect = OSError("AWS Error [code 15]")
 
-    filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
-    f = PyArrowFile("s3://foo/bar.txt")
+    f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
 
     with pytest.raises(PermissionError) as exc_info:
         f.open()
@@ -213,17 +207,13 @@ def test_raise_on_opening_an_s3_file_no_permission(filesystem_mock, exists_mock)
     assert "Cannot open file, access denied:" in str(exc_info.value)
 
 
-@patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_opening_an_s3_file_not_found(filesystem_mock, exists_mock):
+def test_raise_on_opening_an_s3_file_not_found():
     """Test that a PyArrowFile raises a FileNotFoundError when the pyarrow error includes 'Path does not exist'"""
 
     s3fs_mock = MagicMock()
     s3fs_mock.open_input_file.side_effect = OSError("Path does not exist")
 
-    filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
-    f = PyArrowFile("s3://foo/bar.txt")
+    f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
 
     with pytest.raises(FileNotFoundError) as exc_info:
         f.open()
@@ -232,16 +222,13 @@ def test_raise_on_opening_an_s3_file_not_found(filesystem_mock, exists_mock):
 
 
 @patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_creating_an_s3_file_no_permission(filesystem_mock, exists_mock):
+def test_raise_on_creating_an_s3_file_no_permission(_):
     """Test that creating a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
 
     s3fs_mock = MagicMock()
     s3fs_mock.open_output_stream.side_effect = OSError("AWS Error [code 15]")
 
-    filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
-    f = PyArrowFile("s3://foo/bar.txt")
+    f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
 
     with pytest.raises(PermissionError) as exc_info:
         f.create()
@@ -249,35 +236,31 @@ def test_raise_on_creating_an_s3_file_no_permission(filesystem_mock, exists_mock
     assert "Cannot create file, access denied:" in str(exc_info.value)
 
 
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_deleting_s3_file_no_permission(filesystem_mock):
+def test_deleting_s3_file_no_permission():
     """Test that a PyArrowFile raises a PermissionError when the pyarrow OSError includes 'AWS Error [code 15]'"""
 
     s3fs_mock = MagicMock()
     s3fs_mock.delete_file.side_effect = OSError("AWS Error [code 15]")
 
-    filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
-    file_io = PyArrowFileIO()
+    with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
+        submocked.return_value = (s3fs_mock, "bar/foo.txt")
 
-    with pytest.raises(PermissionError) as exc_info:
-        file_io.delete("s3://foo/bar.txt")
+        with pytest.raises(PermissionError) as exc_info:
+            PyArrowFileIO().delete("s3://foo/bar.txt")
 
     assert "Cannot delete file, access denied:" in str(exc_info.value)
 
 
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_deleting_s3_file_not_found(filesystem_mock):
+def test_deleting_s3_file_not_found():
     """Test that a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
 
     s3fs_mock = MagicMock()
     s3fs_mock.delete_file.side_effect = OSError("Path does not exist")
 
-    filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
-    file_io = PyArrowFileIO()
+    with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
+        submocked.return_value = (s3fs_mock, "bar/foo.txt")
 
-    with pytest.raises(FileNotFoundError) as exc_info:
-        file_io.delete("s3://foo/bar.txt")
+        with pytest.raises(FileNotFoundError) as exc_info:
+            PyArrowFileIO().delete("s3://foo/bar.txt")
 
-    assert "Cannot delete file, does not exist:" in str(exc_info.value)
+        assert "Cannot delete file, does not exist:" in str(exc_info.value)