You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/08 00:27:26 UTC
[arrow-datafusion-python] branch master updated: Add `register_object_store` to `SessionContext` (#55)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new 9d2b974 Add `register_object_store` to `SessionContext` (#55)
9d2b974 is described below
commit 9d2b974d763e3bcdee2f45f655b4213aa3f3928a
Author: Will Eaton <ws...@users.noreply.github.com>
AuthorDate: Mon Nov 7 19:27:22 2022 -0500
Add `register_object_store` to `SessionContext` (#55)
* first pass
* remove function
* cleanup
* add doc stub
* remove superfluous move
* ensure a default region is set (us-east-1)
* refactor API
* make "host" optional; more closely resemble rust API
* nit: change module name
* add unified API via &PyAny
* add azure support
* fix error message
* drop url as dep
* run cargo fmt
* bump to 0.5.1 of object_store
* add local store; test register store
* move import to new line to make the linter happy
---
.gitignore | 1 +
Cargo.lock | 370 +++++++++++++++++++++
Cargo.toml | 1 +
datafusion/object_store.py | 23 ++
datafusion/tests/test_store.py | 46 +++
.../python/generated/datafusion.SessionContext.rst | 1 +
src/context.rs | 40 ++-
src/lib.rs | 5 +
src/store.rs | 260 +++++++++++++++
9 files changed, 746 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index cbd980e..5b6cf36 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,3 +19,4 @@ venv
apache-rat-*.jar
*rat.txt
+.env
diff --git a/Cargo.lock b/Cargo.lock
index bfb5f12..fbce888 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -318,6 +318,7 @@ dependencies = [
"iana-time-zone",
"num-integer",
"num-traits",
+ "serde",
"winapi",
]
@@ -597,6 +598,7 @@ dependencies = [
"datafusion-expr",
"futures",
"mimalloc",
+ "object_store",
"pyo3",
"rand 0.7.3",
"tokio",
@@ -650,6 +652,15 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
+[[package]]
+name = "encoding_rs"
+version = "0.8.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
+dependencies = [
+ "cfg-if",
+]
+
[[package]]
name = "fastrand"
version = "1.8.0"
@@ -680,6 +691,12 @@ dependencies = [
"miniz_oxide",
]
+[[package]]
+name = "fnv"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@@ -816,6 +833,25 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
+[[package]]
+name = "h2"
+version = "0.3.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be"
+dependencies = [
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "futures-util",
+ "http",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
[[package]]
name = "half"
version = "2.1.0"
@@ -850,6 +886,77 @@ dependencies = [
"libc",
]
+[[package]]
+name = "http"
+version = "0.2.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa 1.0.4",
+]
+
+[[package]]
+name = "http-body"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
+dependencies = [
+ "bytes",
+ "http",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "httparse"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
+
+[[package]]
+name = "httpdate"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
+
+[[package]]
+name = "hyper"
+version = "0.14.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa 1.0.4",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
+[[package]]
+name = "hyper-rustls"
+version = "0.23.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
+dependencies = [
+ "http",
+ "hyper",
+ "rustls",
+ "tokio",
+ "tokio-rustls",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.51"
@@ -915,6 +1022,12 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
+[[package]]
+name = "ipnet"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
+
[[package]]
name = "itertools"
version = "0.10.5"
@@ -1146,6 +1259,12 @@ dependencies = [
"libmimalloc-sys",
]
+[[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
[[package]]
name = "miniz_oxide"
version = "0.5.4"
@@ -1155,6 +1274,18 @@ dependencies = [
"adler",
]
+[[package]]
+name = "mio"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
+dependencies = [
+ "libc",
+ "log",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys",
+]
+
[[package]]
name = "multiversion"
version = "0.6.1"
@@ -1269,12 +1400,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56ce10a205d9f610ae3532943039c34c145930065ce0c4284134c897fe6073b1"
dependencies = [
"async-trait",
+ "base64",
"bytes",
"chrono",
"futures",
"itertools",
"parking_lot",
"percent-encoding",
+ "quick-xml",
+ "rand 0.8.5",
+ "reqwest",
+ "ring",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
"snafu",
"tokio",
"tracing",
@@ -1466,6 +1605,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88"
+[[package]]
+name = "quick-xml"
+version = "0.25.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "58e21a144a0ffb5fad7b464babcdab934a325ad69b7c0373bcfef5cbd9799ca9"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
[[package]]
name = "quote"
version = "1.0.21"
@@ -1587,12 +1736,88 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "reqwest"
+version = "0.11.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
+dependencies = [
+ "base64",
+ "bytes",
+ "encoding_rs",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "hyper",
+ "hyper-rustls",
+ "ipnet",
+ "js-sys",
+ "log",
+ "mime",
+ "once_cell",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustls",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "tokio",
+ "tokio-rustls",
+ "tokio-util",
+ "tower-service",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+ "webpki-roots",
+ "winreg",
+]
+
+[[package]]
+name = "ring"
+version = "0.16.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
+dependencies = [
+ "cc",
+ "libc",
+ "once_cell",
+ "spin",
+ "untrusted",
+ "web-sys",
+ "winapi",
+]
+
[[package]]
name = "rle-decode-fast"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
+[[package]]
+name = "rustls"
+version = "0.20.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033"
+dependencies = [
+ "log",
+ "ring",
+ "sct",
+ "webpki",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
+dependencies = [
+ "base64",
+]
+
[[package]]
name = "rustversion"
version = "1.0.9"
@@ -1626,6 +1851,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
+[[package]]
+name = "sct"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
[[package]]
name = "seq-macro"
version = "0.3.1"
@@ -1663,6 +1898,18 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa 1.0.4",
+ "ryu",
+ "serde",
+]
+
[[package]]
name = "sha2"
version = "0.10.6"
@@ -1717,6 +1964,22 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451"
+[[package]]
+name = "socket2"
+version = "0.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
+[[package]]
+name = "spin"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
+
[[package]]
name = "sqlparser"
version = "0.25.0"
@@ -1860,11 +2123,15 @@ checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg",
"bytes",
+ "libc",
"memchr",
+ "mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
+ "socket2",
"tokio-macros",
+ "winapi",
]
[[package]]
@@ -1878,6 +2145,17 @@ dependencies = [
"syn",
]
+[[package]]
+name = "tokio-rustls"
+version = "0.23.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
+dependencies = [
+ "rustls",
+ "tokio",
+ "webpki",
+]
+
[[package]]
name = "tokio-stream"
version = "0.1.11"
@@ -1889,6 +2167,26 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-util"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "tower-service"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
+
[[package]]
name = "tracing"
version = "0.1.37"
@@ -1921,6 +2219,12 @@ dependencies = [
"once_cell",
]
+[[package]]
+name = "try-lock"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
+
[[package]]
name = "typed-builder"
version = "0.10.0"
@@ -1977,6 +2281,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112"
+[[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
[[package]]
name = "url"
version = "2.3.1"
@@ -2024,6 +2334,16 @@ dependencies = [
"winapi-util",
]
+[[package]]
+name = "want"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
+dependencies = [
+ "log",
+ "try-lock",
+]
+
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@@ -2061,6 +2381,18 @@ dependencies = [
"wasm-bindgen-shared",
]
+[[package]]
+name = "wasm-bindgen-futures"
+version = "0.4.33"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
+]
+
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
@@ -2090,6 +2422,35 @@ version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
+[[package]]
+name = "web-sys"
+version = "0.3.60"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "webpki"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
+[[package]]
+name = "webpki-roots"
+version = "0.22.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
+dependencies = [
+ "webpki",
+]
+
[[package]]
name = "winapi"
version = "0.3.9"
@@ -2164,6 +2525,15 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
+[[package]]
+name = "winreg"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
+dependencies = [
+ "winapi",
+]
+
[[package]]
name = "zerocopy"
version = "0.6.1"
diff --git a/Cargo.toml b/Cargo.toml
index 262756f..afdb9a9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,6 +41,7 @@ uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", optional = true, default-features = false }
async-trait = "0.1"
futures = "0.3"
+object_store = { version = "0.5.1", features = ["aws", "gcp", "azure"] }
[lib]
name = "datafusion_python"
diff --git a/datafusion/object_store.py b/datafusion/object_store.py
new file mode 100644
index 0000000..70ecbd2
--- /dev/null
+++ b/datafusion/object_store.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from ._internal import object_store
+
+
+def __getattr__(name):
+ return getattr(object_store, name)
diff --git a/datafusion/tests/test_store.py b/datafusion/tests/test_store.py
new file mode 100644
index 0000000..d6f0db5
--- /dev/null
+++ b/datafusion/tests/test_store.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+
+from datafusion import SessionContext
+from datafusion.object_store import LocalFileSystem
+
+
+@pytest.fixture
+def local():
+ return LocalFileSystem()
+
+
+@pytest.fixture
+def ctx(local):
+ ctx = SessionContext()
+ ctx.register_object_store("local", local, None)
+ return ctx
+
+
+def test_read_parquet(ctx):
+ ctx.register_parquet(
+ "test",
+ f"file://{os.getcwd()}/testing/data/parquet",
+ [],
+ True,
+ ".parquet",
+ )
+ df = ctx.sql("SELECT * FROM test")
+ assert isinstance(df.collect(), list)
diff --git a/docs/source/python/generated/datafusion.SessionContext.rst b/docs/source/python/generated/datafusion.SessionContext.rst
index 137e231..3975325 100644
--- a/docs/source/python/generated/datafusion.SessionContext.rst
+++ b/docs/source/python/generated/datafusion.SessionContext.rst
@@ -36,6 +36,7 @@ datafusion.SessionContext
~SessionContext.deregister_table
~SessionContext.empty_table
~SessionContext.register_csv
+ ~SessionContext.register_object_store
~SessionContext.register_parquet
~SessionContext.register_record_batches
~SessionContext.register_table
diff --git a/src/context.rs b/src/context.rs
index 21b3f06..93ee1c7 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashSet;
use std::path::PathBuf;
-use std::{collections::HashSet, sync::Arc};
+use std::sync::Arc;
+use object_store::ObjectStore;
use uuid::Uuid;
use pyo3::exceptions::{PyKeyError, PyValueError};
@@ -35,6 +37,7 @@ use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::DataFusionError;
+use crate::store::StorageContexts;
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::utils::wait_for_future;
@@ -93,6 +96,41 @@ impl PySessionContext {
}
}
+ /// Register a an object store with the given name
+ fn register_object_store(
+ &mut self,
+ scheme: &str,
+ store: &PyAny,
+ host: Option<&str>,
+ ) -> PyResult<()> {
+ let res: Result<(Arc<dyn ObjectStore>, String), PyErr> =
+ match StorageContexts::extract(store) {
+ Ok(store) => match store {
+ StorageContexts::AmazonS3(s3) => Ok((s3.inner, s3.bucket_name)),
+ StorageContexts::GoogleCloudStorage(gcs) => Ok((gcs.inner, gcs.bucket_name)),
+ StorageContexts::MicrosoftAzure(azure) => {
+ Ok((azure.inner, azure.container_name))
+ }
+ StorageContexts::LocalFileSystem(local) => Ok((local.inner, "".to_string())),
+ },
+ Err(_e) => Err(PyValueError::new_err("Invalid object store")),
+ };
+
+ // for most stores the "host" is the bucket name and can be inferred from the store
+ let (store, upstream_host) = res?;
+ // let users override the host to match the api signature from upstream
+ let derived_host = if let Some(host) = host {
+ host
+ } else {
+ &upstream_host
+ };
+
+ self.ctx
+ .runtime_env()
+ .register_object_store(scheme, derived_host, store);
+ Ok(())
+ }
+
/// Returns a PyDataFrame whose plan corresponds to the SQL statement.
fn sql(&mut self, query: &str, py: Python) -> PyResult<PyDataFrame> {
let result = self.ctx.sql(query);
diff --git a/src/lib.rs b/src/lib.rs
index 0423fb1..28544f5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -35,6 +35,7 @@ mod expression;
#[allow(clippy::borrow_deref_ref)]
mod functions;
mod pyarrow_filter_expression;
+pub mod store;
#[allow(clippy::borrow_deref_ref)]
mod udaf;
#[allow(clippy::borrow_deref_ref)]
@@ -67,5 +68,9 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
functions::init_module(funcs)?;
m.add_submodule(funcs)?;
+ let store = PyModule::new(py, "object_store")?;
+ store::init_module(store)?;
+ m.add_submodule(store)?;
+
Ok(())
}
diff --git a/src/store.rs b/src/store.rs
new file mode 100644
index 0000000..2e8c9eb
--- /dev/null
+++ b/src/store.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use pyo3::prelude::*;
+
+use object_store::aws::{AmazonS3, AmazonS3Builder};
+use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
+use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
+use object_store::local::LocalFileSystem;
+
+#[derive(FromPyObject)]
+pub enum StorageContexts {
+ AmazonS3(PyAmazonS3Context),
+ GoogleCloudStorage(PyGoogleCloudContext),
+ MicrosoftAzure(PyMicrosoftAzureContext),
+ LocalFileSystem(PyLocalFileSystemContext),
+}
+
+#[pyclass(
+ name = "LocalFileSystem",
+ module = "datafusion.store",
+ subclass,
+ unsendable
+)]
+#[derive(Debug, Clone)]
+pub struct PyLocalFileSystemContext {
+ pub inner: Arc<LocalFileSystem>,
+}
+
+#[pymethods]
+impl PyLocalFileSystemContext {
+ #[args(prefix = "None")]
+ #[new]
+ fn new(prefix: Option<String>) -> Self {
+ if let Some(prefix) = prefix {
+ Self {
+ inner: Arc::new(
+ LocalFileSystem::new_with_prefix(prefix)
+ .expect("Could not create local LocalFileSystem"),
+ ),
+ }
+ } else {
+ Self {
+ inner: Arc::new(LocalFileSystem::new()),
+ }
+ }
+ }
+}
+
+#[pyclass(
+ name = "MicrosoftAzure",
+ module = "datafusion.store",
+ subclass,
+ unsendable
+)]
+#[derive(Debug, Clone)]
+pub struct PyMicrosoftAzureContext {
+ pub inner: Arc<MicrosoftAzure>,
+ pub container_name: String,
+}
+
+#[pymethods]
+impl PyMicrosoftAzureContext {
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ account = "None",
+ access_key = "None",
+ bearer_token = "None",
+ client_id = "None",
+ client_secret = "None",
+ tenant_id = "None",
+ sas_query_pairs = "None",
+ use_emulator = "None",
+ allow_http = "None"
+ )]
+ #[new]
+ fn new(
+ container_name: String,
+ account: Option<String>,
+ access_key: Option<String>,
+ bearer_token: Option<String>,
+ client_id: Option<String>,
+ client_secret: Option<String>,
+ tenant_id: Option<String>,
+ sas_query_pairs: Option<Vec<(String, String)>>,
+ use_emulator: Option<bool>,
+ allow_http: Option<bool>,
+ ) -> Self {
+ let mut builder = MicrosoftAzureBuilder::from_env().with_container_name(&container_name);
+
+ if let Some(account) = account {
+ builder = builder.with_account(account);
+ }
+
+ if let Some(access_key) = access_key {
+ builder = builder.with_access_key(access_key);
+ }
+
+ if let Some(bearer_token) = bearer_token {
+ builder = builder.with_bearer_token_authorization(bearer_token);
+ }
+
+ match (client_id, client_secret, tenant_id) {
+ (Some(client_id), Some(client_secret), Some(tenant_id)) => {
+ builder =
+ builder.with_client_secret_authorization(client_id, client_secret, tenant_id);
+ }
+ (None, None, None) => {}
+ _ => {
+ panic!("client_id, client_secret, tenat_id must be all set or all None");
+ }
+ }
+
+ if let Some(sas_query_pairs) = sas_query_pairs {
+ builder = builder.with_sas_authorization(sas_query_pairs);
+ }
+
+ if let Some(use_emulator) = use_emulator {
+ builder = builder.with_use_emulator(use_emulator);
+ }
+
+ if let Some(allow_http) = allow_http {
+ builder = builder.with_allow_http(allow_http);
+ }
+
+ Self {
+ inner: Arc::new(
+ builder
+ .build()
+ .expect("Could not create Azure Storage context"), //TODO: change these to PyErr
+ ),
+ container_name,
+ }
+ }
+}
+
+#[pyclass(
+ name = "GoogleCloud",
+ module = "datafusion.store",
+ subclass,
+ unsendable
+)]
+#[derive(Debug, Clone)]
+pub struct PyGoogleCloudContext {
+ pub inner: Arc<GoogleCloudStorage>,
+ pub bucket_name: String,
+}
+
+#[pymethods]
+impl PyGoogleCloudContext {
+ #[allow(clippy::too_many_arguments)]
+ #[args(service_account_path = "None")]
+ #[new]
+ fn new(bucket_name: String, service_account_path: Option<String>) -> Self {
+ let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(&bucket_name);
+
+ if let Some(credential_path) = service_account_path {
+ builder = builder.with_service_account_path(credential_path);
+ }
+
+ Self {
+ inner: Arc::new(
+ builder
+ .build()
+ .expect("Could not create Google Cloud Storage"),
+ ),
+ bucket_name,
+ }
+ }
+}
+
+#[pyclass(name = "AmazonS3", module = "datafusion.store", subclass, unsendable)]
+#[derive(Debug, Clone)]
+pub struct PyAmazonS3Context {
+ pub inner: Arc<AmazonS3>,
+ pub bucket_name: String,
+}
+
+#[pymethods]
+impl PyAmazonS3Context {
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ region = "None",
+ access_key_id = "None",
+ secret_access_key = "None",
+ endpoint = "None",
+ imdsv1_fallback = "false",
+ allow_http = "false"
+ )]
+ #[new]
+ fn new(
+ bucket_name: String,
+ region: Option<String>,
+ access_key_id: Option<String>,
+ secret_access_key: Option<String>,
+ endpoint: Option<String>,
+ //retry_config: RetryConfig,
+ allow_http: bool,
+ imdsv1_fallback: bool,
+ ) -> Self {
+ // start w/ the options that come directly from the environment
+ let mut builder = AmazonS3Builder::from_env();
+
+ if let Some(region) = region {
+ builder = builder.with_region(region);
+ }
+
+ if let Some(access_key_id) = access_key_id {
+ builder = builder.with_access_key_id(access_key_id);
+ };
+
+ if let Some(secret_access_key) = secret_access_key {
+ builder = builder.with_secret_access_key(secret_access_key);
+ };
+
+ if let Some(endpoint) = endpoint {
+ builder = builder.with_endpoint(endpoint);
+ };
+
+ if imdsv1_fallback {
+ builder = builder.with_imdsv1_fallback();
+ };
+
+ let store = builder
+ .with_bucket_name(bucket_name.clone())
+ //.with_retry_config(retry_config) #TODO: add later
+ .with_allow_http(allow_http)
+ .build()
+ .expect("failed to build AmazonS3");
+
+ Self {
+ inner: Arc::new(store),
+ bucket_name,
+ }
+ }
+}
+
+pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+ m.add_class::<PyAmazonS3Context>()?;
+ m.add_class::<PyMicrosoftAzureContext>()?;
+ m.add_class::<PyGoogleCloudContext>()?;
+ m.add_class::<PyLocalFileSystemContext>()?;
+ Ok(())
+}