You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2022/09/19 13:37:01 UTC
[iceberg] branch master updated: Python: PyArrow support for S3/S3A with properties (#5747)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f8aecf6c34 Python: PyArrow support for S3/S3A with properties (#5747)
f8aecf6c34 is described below
commit f8aecf6c3499d9fe08abebcf08b44fe5b4d1534f
Author: Joshua Robinson <33...@users.noreply.github.com>
AuthorDate: Mon Sep 19 15:36:54 2022 +0200
Python: PyArrow support for S3/S3A with properties (#5747)
---
python/pyiceberg/io/pyarrow.py | 49 +++++++++++++++++++++-------
python/tests/io/test_io.py | 56 ++++++++++++++++++--------------
python/tests/io/test_pyarrow.py | 71 ++++++++++++++++-------------------------
3 files changed, 96 insertions(+), 80 deletions(-)
diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index c07a1118aa..dd82bbfb77 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -23,10 +23,16 @@ with the pyarrow library.
"""
import os
-from typing import Union
+from functools import lru_cache
+from typing import Callable, Tuple, Union
from urllib.parse import urlparse
-from pyarrow.fs import FileInfo, FileSystem, FileType
+from pyarrow.fs import (
+ FileInfo,
+ FileSystem,
+ FileType,
+ S3FileSystem,
+)
from pyiceberg.io import (
FileIO,
@@ -35,6 +41,7 @@ from pyiceberg.io import (
OutputFile,
OutputStream,
)
+from pyiceberg.typedef import EMPTY_DICT, Properties
class PyArrowFile(InputFile, OutputFile):
@@ -59,12 +66,9 @@ class PyArrowFile(InputFile, OutputFile):
>>> # output_file.create().write(b'foobytes')
"""
- def __init__(self, location: str):
- parsed_location = urlparse(location) # Create a ParseResult from the URI
- if not parsed_location.scheme: # If no scheme, assume the path is to a local file
- self._filesystem, self._path = FileSystem.from_uri(os.path.abspath(location))
- else:
- self._filesystem, self._path = FileSystem.from_uri(location) # Infer the proper filesystem
+ def __init__(self, location: str, path: str, fs: FileSystem):
+ self._filesystem = fs
+ self._path = path
super().__init__(location=location)
def _file_info(self) -> FileInfo:
@@ -165,6 +169,24 @@ class PyArrowFile(InputFile, OutputFile):
class PyArrowFileIO(FileIO):
+ def __init__(self, properties: Properties = EMPTY_DICT):
+ self.get_fs_and_path: Callable = lru_cache(self._get_fs_and_path)
+ super().__init__(properties=properties)
+
+ def _get_fs_and_path(self, location: str) -> Tuple[FileSystem, str]:
+ uri = urlparse(location) # Create a ParseResult from the URI
+ if not uri.scheme: # If no scheme, assume the path is to a local file
+ return FileSystem.from_uri(os.path.abspath(location))
+ elif uri.scheme in {"s3", "s3a", "s3n"}:
+ client_kwargs = {
+ "endpoint_override": self.properties.get("s3.endpoint"),
+ "access_key": self.properties.get("s3.access-key-id"),
+ "secret_key": self.properties.get("s3.secret-access-key"),
+ }
+ return (S3FileSystem(**client_kwargs), uri.netloc + uri.path)
+ else:
+ return FileSystem.from_uri(location) # Infer the proper filesystem
+
def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location
@@ -174,7 +196,8 @@ class PyArrowFileIO(FileIO):
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
- return PyArrowFile(location)
+ fs, path = self.get_fs_and_path(location)
+ return PyArrowFile(fs=fs, location=location, path=path)
def new_output(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to write bytes to the file at the given location
@@ -185,7 +208,8 @@ class PyArrowFileIO(FileIO):
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
- return PyArrowFile(location)
+ fs, path = self.get_fs_and_path(location)
+ return PyArrowFile(fs=fs, location=location, path=path)
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"""Delete the file at the given location
@@ -201,9 +225,10 @@ class PyArrowFileIO(FileIO):
an AWS error code 15
"""
str_path = location.location if isinstance(location, (InputFile, OutputFile)) else location
- filesystem, path = FileSystem.from_uri(str_path) # Infer the proper filesystem
+ fs, path = self.get_fs_and_path(str_path)
+
try:
- filesystem.delete_file(path)
+ fs.delete_file(path)
except FileNotFoundError:
raise
except PermissionError:
diff --git a/python/tests/io/test_io.py b/python/tests/io/test_io.py
index 0e67870b54..fe04b76f7e 100644
--- a/python/tests/io/test_io.py
+++ b/python/tests/io/test_io.py
@@ -35,7 +35,7 @@ from pyiceberg.io import (
load_file_io,
)
from pyiceberg.io.fsspec import FsspecFileIO
-from pyiceberg.io.pyarrow import PyArrowFile, PyArrowFileIO
+from pyiceberg.io.pyarrow import PyArrowFileIO
class LocalInputFile(InputFile):
@@ -133,8 +133,8 @@ class LocalFileIO(FileIO):
raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path}") from e
-@pytest.mark.parametrize("CustomInputFile", [LocalInputFile, PyArrowFile])
-def test_custom_local_input_file(CustomInputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_input_file(CustomFileIO):
"""Test initializing an InputFile implementation to read a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
@@ -146,7 +146,7 @@ def test_custom_local_input_file(CustomInputFile):
# Instantiate the input file
absolute_file_location = os.path.abspath(file_location)
- input_file = CustomInputFile(location=f"{absolute_file_location}")
+ input_file = CustomFileIO().new_input(location=f"{absolute_file_location}")
# Test opening and reading the file
f = input_file.open()
@@ -155,15 +155,15 @@ def test_custom_local_input_file(CustomInputFile):
assert len(input_file) == 3
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_custom_local_output_file(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_output_file(CustomFileIO):
"""Test initializing an OutputFile implementation to write to a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
# Instantiate the output file
absolute_file_location = os.path.abspath(file_location)
- output_file = CustomOutputFile(location=f"{absolute_file_location}")
+ output_file = CustomFileIO().new_output(location=f"{absolute_file_location}")
# Create the output file and write to it
f = output_file.create()
@@ -176,8 +176,8 @@ def test_custom_local_output_file(CustomOutputFile):
assert len(output_file) == 3
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_custom_local_output_file_with_overwrite(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_local_output_file_with_overwrite(CustomFileIO):
"""Test initializing an OutputFile implementation to overwrite a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
output_file_location = os.path.join(tmpdirname, "foo.txt")
@@ -187,7 +187,7 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
f.write(b"foo")
# Instantiate an output file
- output_file = CustomOutputFile(location=f"{output_file_location}")
+ output_file = CustomFileIO().new_output(location=f"{output_file_location}")
# Confirm that a FileExistsError is raised when overwrite=False
with pytest.raises(FileExistsError):
@@ -201,8 +201,8 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
assert f.read() == b"bar"
-@pytest.mark.parametrize("CustomFile", [LocalInputFile, LocalOutputFile, PyArrowFile, PyArrowFile])
-def test_custom_file_exists(CustomFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_custom_file_exists(CustomFileIO):
"""Test that the exists property returns the proper value for existing and non-existing files"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
@@ -218,23 +218,31 @@ def test_custom_file_exists(CustomFile):
absolute_file_location = os.path.abspath(file_location)
non_existent_absolute_file_location = os.path.abspath(nonexistent_file_location)
- # Create File instances
- file = CustomFile(location=f"{absolute_file_location}")
- non_existent_file = CustomFile(location=f"{non_existent_absolute_file_location}")
+ # Create InputFile instances
+ file = CustomFileIO().new_input(location=f"{absolute_file_location}")
+ non_existent_file = CustomFileIO().new_input(location=f"{non_existent_absolute_file_location}")
+
+ # Test opening and reading the file
+ assert file.exists()
+ assert not non_existent_file.exists()
+
+ # Create OutputFile instances
+ file = CustomFileIO().new_output(location=f"{absolute_file_location}")
+ non_existent_file = CustomFileIO().new_output(location=f"{non_existent_absolute_file_location}")
# Test opening and reading the file
assert file.exists()
assert not non_existent_file.exists()
-@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
-def test_output_file_to_input_file(CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_output_file_to_input_file(CustomFileIO):
"""Test initializing an InputFile using the `to_input_file()` method on an OutputFile instance"""
with tempfile.TemporaryDirectory() as tmpdirname:
output_file_location = os.path.join(tmpdirname, "foo.txt")
# Create an output file instance
- output_file = CustomOutputFile(location=f"{output_file_location}")
+ output_file = CustomFileIO().new_output(location=f"{output_file_location}")
# Create the output file and write to it
f = output_file.create()
@@ -334,8 +342,8 @@ def test_raise_file_not_found_error_for_fileio_delete(CustomFileIO):
assert not os.path.exists(output_file_location)
-@pytest.mark.parametrize("CustomFileIO, CustomInputFile", [(LocalFileIO, LocalInputFile), (PyArrowFileIO, PyArrowFile)])
-def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_deleting_local_file_using_file_io_input_file(CustomFileIO):
"""Test deleting a local file by passing an InputFile instance to FileIO.delete(...)"""
with tempfile.TemporaryDirectory() as tmpdirname:
# Write to the temporary file
@@ -350,7 +358,7 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
assert os.path.exists(file_location)
# Instantiate the custom InputFile
- input_file = CustomInputFile(location=f"{file_location}")
+ input_file = CustomFileIO().new_input(location=f"{file_location}")
# Delete the file using the file-io implementations delete method
file_io.delete(input_file)
@@ -359,8 +367,8 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
assert not os.path.exists(file_location)
-@pytest.mark.parametrize("CustomFileIO, CustomOutputFile", [(LocalFileIO, LocalOutputFile), (PyArrowFileIO, PyArrowFile)])
-def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutputFile):
+@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
+def test_deleting_local_file_using_file_io_output_file(CustomFileIO):
"""Test deleting a local file by passing an OutputFile instance to FileIO.delete(...)"""
with tempfile.TemporaryDirectory() as tmpdirname:
# Write to the temporary file
@@ -375,7 +383,7 @@ def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutpu
assert os.path.exists(file_location)
# Instantiate the custom OutputFile
- output_file = CustomOutputFile(location=f"{file_location}")
+ output_file = CustomFileIO().new_output(location=f"{file_location}")
# Delete the file using the file-io implementations delete method
file_io.delete(output_file)
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 43dcd22a8d..1b0b07f671 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -40,7 +40,7 @@ def test_pyarrow_input_file():
# Instantiate the input file
absolute_file_location = os.path.abspath(file_location)
- input_file = PyArrowFile(location=f"{absolute_file_location}")
+ input_file = PyArrowFileIO().new_input(location=f"{absolute_file_location}")
# Test opening and reading the file
f = input_file.open()
@@ -58,7 +58,7 @@ def test_pyarrow_output_file():
# Instantiate the output file
absolute_file_location = os.path.abspath(file_location)
- output_file = PyArrowFile(location=f"{absolute_file_location}")
+ output_file = PyArrowFileIO().new_output(location=f"{absolute_file_location}")
# Create the output file and write to it
f = output_file.create()
@@ -76,12 +76,12 @@ def test_pyarrow_invalid_scheme():
"""Test that a ValueError is raised if a location is provided with an invalid scheme"""
with pytest.raises(ValueError) as exc_info:
- PyArrowFile("foo://bar/baz.txt")
+ PyArrowFileIO().new_input("foo://bar/baz.txt")
assert ("Unrecognized filesystem type in URI") in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
- PyArrowFile("foo://bar/baz.txt")
+ PyArrowFileIO().new_output("foo://bar/baz.txt")
assert ("Unrecognized filesystem type in URI") in str(exc_info.value)
@@ -96,8 +96,7 @@ def test_pyarrow_violating_input_stream_protocol():
filesystem_mock = MagicMock()
filesystem_mock.open_input_file.return_value = input_file_mock
- input_file = PyArrowFile("foo.txt")
- input_file._filesystem = filesystem_mock
+ input_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock)
f = input_file.open()
assert not isinstance(f, InputStream)
@@ -118,8 +117,7 @@ def test_pyarrow_violating_output_stream_protocol():
filesystem_mock.open_output_stream.return_value = output_file_mock
filesystem_mock.get_file_info.return_value = file_info_mock
- output_file = PyArrowFile("foo.txt")
- output_file._filesystem = filesystem_mock
+ output_file = PyArrowFile("foo.txt", path="foo.txt", fs=filesystem_mock)
f = output_file.create()
@@ -131,7 +129,7 @@ def test_raise_on_opening_a_local_file_not_found():
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
- f = PyArrowFile(file_location)
+ f = PyArrowFileIO().new_input(file_location)
with pytest.raises(FileNotFoundError) as exc_info:
f.open()
@@ -145,7 +143,7 @@ def test_raise_on_opening_a_local_file_no_permission():
with tempfile.TemporaryDirectory() as tmpdirname:
os.chmod(tmpdirname, 0o600)
file_location = os.path.join(tmpdirname, "foo.txt")
- f = PyArrowFile(file_location)
+ f = PyArrowFileIO().new_input(file_location)
with pytest.raises(PermissionError) as exc_info:
f.open()
@@ -159,7 +157,7 @@ def test_raise_on_checking_if_local_file_exists_no_permission():
with tempfile.TemporaryDirectory() as tmpdirname:
os.chmod(tmpdirname, 0o600)
file_location = os.path.join(tmpdirname, "foo.txt")
- f = PyArrowFile(file_location)
+ f = PyArrowFileIO().new_input(file_location)
with pytest.raises(PermissionError) as exc_info:
f.create()
@@ -173,7 +171,7 @@ def test_raise_on_creating_a_local_file_no_permission():
with tempfile.TemporaryDirectory() as tmpdirname:
os.chmod(tmpdirname, 0o600)
file_location = os.path.join(tmpdirname, "foo.txt")
- f = PyArrowFile(file_location)
+ f = PyArrowFileIO().new_input(file_location)
with pytest.raises(PermissionError) as exc_info:
f.create()
@@ -195,17 +193,13 @@ def test_raise_on_delete_file_with_no_permission():
assert "Cannot delete file" in str(exc_info.value)
-@patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_opening_an_s3_file_no_permission(filesystem_mock, exists_mock):
+def test_raise_on_opening_an_s3_file_no_permission():
"""Test that opening a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
s3fs_mock = MagicMock()
s3fs_mock.open_input_file.side_effect = OSError("AWS Error [code 15]")
- filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
- f = PyArrowFile("s3://foo/bar.txt")
+ f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
with pytest.raises(PermissionError) as exc_info:
f.open()
@@ -213,17 +207,13 @@ def test_raise_on_opening_an_s3_file_no_permission(filesystem_mock, exists_mock)
assert "Cannot open file, access denied:" in str(exc_info.value)
-@patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_opening_an_s3_file_not_found(filesystem_mock, exists_mock):
+def test_raise_on_opening_an_s3_file_not_found():
"""Test that a PyArrowFile raises a FileNotFoundError when the pyarrow error includes 'Path does not exist'"""
s3fs_mock = MagicMock()
s3fs_mock.open_input_file.side_effect = OSError("Path does not exist")
- filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
- f = PyArrowFile("s3://foo/bar.txt")
+ f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
with pytest.raises(FileNotFoundError) as exc_info:
f.open()
@@ -232,16 +222,13 @@ def test_raise_on_opening_an_s3_file_not_found(filesystem_mock, exists_mock):
@patch("pyiceberg.io.pyarrow.PyArrowFile.exists", return_value=False)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_raise_on_creating_an_s3_file_no_permission(filesystem_mock, exists_mock):
+def test_raise_on_creating_an_s3_file_no_permission(_):
"""Test that creating a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
s3fs_mock = MagicMock()
s3fs_mock.open_output_stream.side_effect = OSError("AWS Error [code 15]")
- filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
- f = PyArrowFile("s3://foo/bar.txt")
+ f = PyArrowFile("s3://foo/bar.txt", path="foo/bar.txt", fs=s3fs_mock)
with pytest.raises(PermissionError) as exc_info:
f.create()
@@ -249,35 +236,31 @@ def test_raise_on_creating_an_s3_file_no_permission(filesystem_mock, exists_mock
assert "Cannot create file, access denied:" in str(exc_info.value)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_deleting_s3_file_no_permission(filesystem_mock):
+def test_deleting_s3_file_no_permission():
"""Test that a PyArrowFile raises a PermissionError when the pyarrow OSError includes 'AWS Error [code 15]'"""
s3fs_mock = MagicMock()
s3fs_mock.delete_file.side_effect = OSError("AWS Error [code 15]")
- filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
- file_io = PyArrowFileIO()
+ with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
+ submocked.return_value = (s3fs_mock, "bar/foo.txt")
- with pytest.raises(PermissionError) as exc_info:
- file_io.delete("s3://foo/bar.txt")
+ with pytest.raises(PermissionError) as exc_info:
+ PyArrowFileIO().delete("s3://foo/bar.txt")
assert "Cannot delete file, access denied:" in str(exc_info.value)
-@patch("pyiceberg.io.pyarrow.FileSystem")
-def test_deleting_s3_file_not_found(filesystem_mock):
+def test_deleting_s3_file_not_found():
"""Test that a PyArrowFile raises a PermissionError when the pyarrow error includes 'AWS Error [code 15]'"""
s3fs_mock = MagicMock()
s3fs_mock.delete_file.side_effect = OSError("Path does not exist")
- filesystem_mock.from_uri.return_value = (s3fs_mock, "foo.txt")
-
- file_io = PyArrowFileIO()
+ with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
+ submocked.return_value = (s3fs_mock, "bar/foo.txt")
- with pytest.raises(FileNotFoundError) as exc_info:
- file_io.delete("s3://foo/bar.txt")
+ with pytest.raises(FileNotFoundError) as exc_info:
+ PyArrowFileIO().delete("s3://foo/bar.txt")
- assert "Cannot delete file, does not exist:" in str(exc_info.value)
+ assert "Cannot delete file, does not exist:" in str(exc_info.value)