You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/02/15 09:26:05 UTC
[iceberg] branch master updated: Python: Set PyArrow as the default FileIO (#6822)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbea4aee9 Python: Set PyArrow as the default FileIO (#6822)
3bbea4aee9 is described below
commit 3bbea4aee93ea90f153af4640a548fe40c4a7e65
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Wed Feb 15 10:25:59 2023 +0100
Python: Set PyArrow as the default FileIO (#6822)
PyArrow is the most feature complete FileIO,
and it can be a bit confusing.
---
python/mkdocs/docs/configuration.md | 23 +++++++++++++++
python/pyiceberg/io/__init__.py | 8 ++---
python/tests/catalog/test_glue.py | 58 ++++++++++++++++++-------------------
python/tests/io/test_io.py | 3 +-
4 files changed, 57 insertions(+), 35 deletions(-)
diff --git a/python/mkdocs/docs/configuration.md b/python/mkdocs/docs/configuration.md
index 3780d580e2..23aec50554 100644
--- a/python/mkdocs/docs/configuration.md
+++ b/python/mkdocs/docs/configuration.md
@@ -35,14 +35,37 @@ export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
The environment variable picked up by Iceberg starts with `PYICEBERG_` and then follows the yaml structure below, where a double underscore `__` represents a nested field.
+## FileIO
+
+Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.
+
+- **s3**, **s3a**, **s3n**: `PyArrowFileIO`, `FsspecFileIO`
+- **gs**: `PyArrowFileIO`
+- **file**: `PyArrowFileIO`
+- **hdfs**: `PyArrowFileIO`
+- **abfs**, **abfss**: `FsspecFileIO`
+
+You can also set the FileIO explicitly:
+
+| Key | Example | Description |
+|----------------------|----------------------------------|-------------------------------------------------------------------------------------------------|
+| py-io-impl | pyiceberg.io.fsspec.FsspecFileIO | Sets the FileIO explicitly to an implementation, and will fail explicitly if it can't be loaded |
+
For the FileIO there are several configuration options available:
+### S3
+
| Key | Example | Description |
|--------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | https://10.0.19.25/ | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static secret access key used to access the FileIO. |
| s3.secret-access-key | password | Configure the static session token used to access the FileIO. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
+
+### Azure Data lake
+
+| Key | Example | Description |
+|--------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| adlfs.endpoint | http://127.0.0.1/ | Configure an alternative endpoint of the ADLFS service for the FileIO to access. This could be used to use FileIO with any adlfs-compatible object storage service that has a different endpoint (like [azurite](https://github.com/azure/azurite)). |
| adlfs.account-name | devstoreaccount1 | Configure the static storage account name used to access the FileIO. |
| adlfs.account-key | Eby8vdM02xNOcqF... | Configure the static storage account key used to access the FileIO. |
diff --git a/python/pyiceberg/io/__init__.py b/python/pyiceberg/io/__init__.py
index fd86bf5c91..9690e53c67 100644
--- a/python/pyiceberg/io/__init__.py
+++ b/python/pyiceberg/io/__init__.py
@@ -254,10 +254,10 @@ FSSPEC_FILE_IO = "pyiceberg.io.fsspec.FsspecFileIO"
# Mappings from the Java FileIO impl to a Python one. The list is ordered by preference.
# If an implementation isn't installed, it will fall back to the next one.
SCHEMA_TO_FILE_IO: Dict[str, List[str]] = {
- "s3": [FSSPEC_FILE_IO, ARROW_FILE_IO],
- "s3a": [FSSPEC_FILE_IO, ARROW_FILE_IO],
- "s3n": [FSSPEC_FILE_IO, ARROW_FILE_IO],
- "gcs": [ARROW_FILE_IO],
+ "s3": [ARROW_FILE_IO, FSSPEC_FILE_IO],
+ "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
+ "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
+ "gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
diff --git a/python/tests/catalog/test_glue.py b/python/tests/catalog/test_glue.py
index bdf64b9ef1..26825c052a 100644
--- a/python/tests/catalog/test_glue.py
+++ b/python/tests/catalog/test_glue.py
@@ -52,7 +52,7 @@ def test_create_table_with_database_location(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == identifier
@@ -64,7 +64,7 @@ def test_create_table_with_default_warehouse(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == identifier
@@ -76,7 +76,7 @@ def test_create_table_with_given_location(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(
identifier=identifier, schema=table_schema_nested, location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
@@ -90,7 +90,7 @@ def test_create_table_with_no_location(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(ValueError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@@ -101,7 +101,7 @@ def test_create_table_with_strips(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db/"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == identifier
@@ -113,7 +113,7 @@ def test_create_table_with_strips_bucket_root(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}/")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table_strip = test_catalog.create_table(identifier, table_schema_nested)
assert table_strip.identifier == identifier
@@ -125,7 +125,7 @@ def test_create_table_with_no_database(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@@ -135,7 +135,7 @@ def test_create_duplicated_table(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
@@ -147,7 +147,7 @@ def test_load_table(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
@@ -158,7 +158,7 @@ def test_load_table(
@mock_glue
def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@@ -169,7 +169,7 @@ def test_drop_table(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
@@ -183,7 +183,7 @@ def test_drop_table(
@mock_glue
def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
with pytest.raises(NoSuchTableError):
test_catalog.drop_table(identifier)
@@ -195,7 +195,7 @@ def test_rename_table(
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (database_name, new_table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == identifier
@@ -216,7 +216,7 @@ def test_rename_table_no_params(_glue, _bucket_initialize: None, _patch_aiobotoc
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
_glue.create_table(
@@ -233,7 +233,7 @@ def test_rename_non_iceberg_table(_glue, _bucket_initialize: None, _patch_aiobot
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
_glue.create_table(
@@ -257,7 +257,7 @@ def test_list_tables(
table_name: str,
table_list: List[str],
) -> None:
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
@@ -268,7 +268,7 @@ def test_list_tables(
@mock_glue
def test_list_namespaces(_bucket_initialize: None, _patch_aiobotocore: None, database_list: List[str]) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
for database_name in database_list:
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
@@ -278,7 +278,7 @@ def test_list_namespaces(_bucket_initialize: None, _patch_aiobotocore: None, dat
@mock_glue
def test_create_namespace_no_properties(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
@@ -296,7 +296,7 @@ def test_create_namespace_with_comment_and_location(
"comment": "this is a test description",
"location": test_location,
}
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
@@ -308,7 +308,7 @@ def test_create_namespace_with_comment_and_location(
@mock_glue
def test_create_duplicated_namespace(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
@@ -319,7 +319,7 @@ def test_create_duplicated_namespace(_bucket_initialize: None, _patch_aiobotocor
@mock_glue
def test_drop_namespace(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
@@ -334,7 +334,7 @@ def test_drop_non_empty_namespace(
_bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
- test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
assert len(test_catalog.list_tables(database_name)) == 1
@@ -344,7 +344,7 @@ def test_drop_non_empty_namespace(
@mock_glue
def test_drop_non_exist_namespace(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
with pytest.raises(NoSuchNamespaceError):
test_catalog.drop_namespace(database_name)
@@ -359,7 +359,7 @@ def test_load_namespace_properties(_bucket_initialize: None, _patch_aiobotocore:
"test_property2": "2",
"test_property3": "3",
}
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in listed_properties.items():
@@ -369,7 +369,7 @@ def test_load_namespace_properties(_bucket_initialize: None, _patch_aiobotocore:
@mock_glue
def test_load_non_exist_namespace_properties(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
with pytest.raises(NoSuchNamespaceError):
test_catalog.load_namespace_properties(database_name)
@@ -385,7 +385,7 @@ def test_update_namespace_properties(_bucket_initialize: None, _patch_aiobotocor
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
@@ -401,7 +401,7 @@ def test_update_namespace_properties(_bucket_initialize: None, _patch_aiobotocor
@mock_glue
def test_load_empty_namespace_properties(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str) -> None:
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(database_name)
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
@@ -411,7 +411,7 @@ def test_load_empty_namespace_properties(_bucket_initialize: None, _patch_aiobot
def test_load_default_namespace_properties(_glue, _bucket_initialize, _patch_aiobotocore, database_name: str) -> None: # type: ignore
# simulate creating database with default settings through AWS Glue Web Console
_glue.create_database(DatabaseInput={"Name": database_name})
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
@@ -429,7 +429,7 @@ def test_update_namespace_properties_overlap_update_removal(
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property1": "4", "test_property5": "5", "comment": "updated test description"}
- test_catalog = GlueCatalog("glue")
+ test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"})
test_catalog.create_namespace(database_name, test_properties)
with pytest.raises(ValueError):
test_catalog.update_namespace_properties(database_name, removals, updates)
diff --git a/python/tests/io/test_io.py b/python/tests/io/test_io.py
index 6cb5dbc7d3..c872ae1c7c 100644
--- a/python/tests/io/test_io.py
+++ b/python/tests/io/test_io.py
@@ -26,7 +26,6 @@ from pyiceberg.io import (
_import_file_io,
load_file_io,
)
-from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -277,7 +276,7 @@ def test_load_file_io_does_not_exist() -> None:
def test_load_file_io_warehouse() -> None:
- assert isinstance(load_file_io({"warehouse": "s3://some-path/"}), FsspecFileIO)
+ assert isinstance(load_file_io({"warehouse": "s3://some-path/"}), PyArrowFileIO)
def test_load_file_io_location() -> None: