You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/10 17:07:06 UTC
[arrow-datafusion] branch master updated: ObjectStore API to read
from remote storage systems (#950)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6f53180 ObjectStore API to read from remote storage systems (#950)
6f53180 is described below
commit 6f531807176e49110c33a01722014552024fa412
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Sep 11 01:06:57 2021 +0800
ObjectStore API to read from remote storage systems (#950)
* Object Store API to read from remote storage systems
* add tokio fs
* resolve comments
* fmt
* resolve comments
* fix test
* rerun
* file_reader async
* add delimiter option in list
* fix fmt
* file_reader to sync
* An optional list_dir api
---
.../core/src/serde/physical_plan/from_proto.rs | 2 +
datafusion/Cargo.toml | 2 +-
datafusion/src/datasource/mod.rs | 1 +
datafusion/src/datasource/object_store/local.rs | 177 +++++++++++++++++++++
datafusion/src/datasource/object_store/mod.rs | 151 ++++++++++++++++++
datafusion/src/execution/context.rs | 28 ++++
6 files changed, 360 insertions(+), 1 deletion(-)
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 522bac2..6aa0fa1 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -35,6 +35,7 @@ use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::datasource::Statistics;
+use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::datasource::FilePartition;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
@@ -655,6 +656,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
+ object_store_registry: Arc::new(ObjectStoreRegistry::new()),
};
let fun_expr = functions::create_physical_fun(
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index c9ab943..f30db02 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -58,7 +58,7 @@ chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.0"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
+tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
tokio-stream = "0.1"
log = "^0.4"
md-5 = { version = "^0.9.1", optional = true }
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index d5e2952..df3328e 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -22,6 +22,7 @@ pub mod datasource;
pub mod empty;
pub mod json;
pub mod memory;
+pub mod object_store;
pub mod parquet;
pub use self::csv::{CsvFile, CsvReadOptions};
diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs
new file mode 100644
index 0000000..2b27f6c
--- /dev/null
+++ b/datafusion/src/datasource/object_store/local.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object store that represents the Local File System.
+
+use std::fs::Metadata;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use futures::{stream, AsyncRead, StreamExt};
+
+use crate::datasource::object_store::{
+ FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
+};
+use crate::error::DataFusionError;
+use crate::error::Result;
+
+#[derive(Debug)]
+/// Local File System as Object Store.
+pub struct LocalFileSystem;
+
+#[async_trait]
+impl ObjectStore for LocalFileSystem {
+ async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+ list_all(prefix.to_owned()).await
+ }
+
+ async fn list_dir(
+ &self,
+ _prefix: &str,
+ _delimiter: Option<String>,
+ ) -> Result<ListEntryStream> {
+ todo!()
+ }
+
+ fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>> {
+ Ok(Arc::new(LocalFileReader::new(file)?))
+ }
+}
+
+struct LocalFileReader {
+ file: FileMeta,
+}
+
+impl LocalFileReader {
+ fn new(file: FileMeta) -> Result<Self> {
+ Ok(Self { file })
+ }
+}
+
+#[async_trait]
+impl ObjectReader for LocalFileReader {
+ async fn chunk_reader(
+ &self,
+ _start: u64,
+ _length: usize,
+ ) -> Result<Arc<dyn AsyncRead>> {
+ todo!()
+ }
+
+ fn length(&self) -> u64 {
+ self.file.size
+ }
+}
+
+async fn list_all(prefix: String) -> Result<FileMetaStream> {
+ fn get_meta(path: String, metadata: Metadata) -> FileMeta {
+ FileMeta {
+ path,
+ last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
+ size: metadata.len(),
+ }
+ }
+
+ async fn find_files_in_dir(
+ path: String,
+ to_visit: &mut Vec<String>,
+ ) -> Result<Vec<FileMeta>> {
+ let mut dir = tokio::fs::read_dir(path).await?;
+ let mut files = Vec::new();
+
+ while let Some(child) = dir.next_entry().await? {
+ if let Some(child_path) = child.path().to_str() {
+ let metadata = child.metadata().await?;
+ if metadata.is_dir() {
+ to_visit.push(child_path.to_string());
+ } else {
+ files.push(get_meta(child_path.to_owned(), metadata))
+ }
+ } else {
+ return Err(DataFusionError::Plan("Invalid path".to_string()));
+ }
+ }
+ Ok(files)
+ }
+
+ let prefix_meta = tokio::fs::metadata(&prefix).await?;
+ let prefix = prefix.to_owned();
+ if prefix_meta.is_file() {
+ Ok(Box::pin(stream::once(async move {
+ Ok(get_meta(prefix, prefix_meta))
+ })))
+ } else {
+ let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
+ match to_visit.pop() {
+ None => None,
+ Some(path) => {
+ let file_stream = match find_files_in_dir(path, &mut to_visit).await {
+ Ok(files) => stream::iter(files).map(Ok).left_stream(),
+ Err(e) => stream::once(async { Err(e) }).right_stream(),
+ };
+
+ Some((file_stream, to_visit))
+ }
+ }
+ })
+ .flatten();
+ Ok(Box::pin(result))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use futures::StreamExt;
+ use std::collections::HashSet;
+ use std::fs::create_dir;
+ use std::fs::File;
+ use tempfile::tempdir;
+
+ #[tokio::test]
+ async fn test_recursive_listing() -> Result<()> {
+ // tmp/a.txt
+ // tmp/x/b.txt
+ // tmp/y/c.txt
+ let tmp = tempdir()?;
+ let x_path = tmp.path().join("x");
+ let y_path = tmp.path().join("y");
+ let a_path = tmp.path().join("a.txt");
+ let b_path = x_path.join("b.txt");
+ let c_path = y_path.join("c.txt");
+ create_dir(&x_path)?;
+ create_dir(&y_path)?;
+ File::create(&a_path)?;
+ File::create(&b_path)?;
+ File::create(&c_path)?;
+
+ let mut all_files = HashSet::new();
+ let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
+ while let Some(file) = files.next().await {
+ let file = file?;
+ assert_eq!(file.size, 0);
+ all_files.insert(file.path);
+ }
+
+ assert_eq!(all_files.len(), 3);
+ assert!(all_files.contains(a_path.to_str().unwrap()));
+ assert!(all_files.contains(b_path.to_str().unwrap()));
+ assert!(all_files.contains(c_path.to_str().unwrap()));
+
+ Ok(())
+ }
+}
diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs
new file mode 100644
index 0000000..fd25fd4
--- /dev/null
+++ b/datafusion/src/datasource/object_store/mod.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+pub mod local;
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::{Arc, RwLock};
+
+use async_trait::async_trait;
+use futures::{AsyncRead, Stream};
+
+use local::LocalFileSystem;
+
+use crate::error::{DataFusionError, Result};
+use chrono::Utc;
+
+/// Object Reader for one file in a object store
+#[async_trait]
+pub trait ObjectReader {
+ /// Get reader for a part [start, start + length] in the file asynchronously
+ async fn chunk_reader(&self, start: u64, length: usize)
+ -> Result<Arc<dyn AsyncRead>>;
+
+ /// Get length for the file
+ fn length(&self) -> u64;
+}
+
+/// Represents a file or a prefix that may require further resolution
+#[derive(Debug)]
+pub enum ListEntry {
+ /// File metadata
+ FileMeta(FileMeta),
+ /// Prefix to be further resolved during partition discovery
+ Prefix(String),
+}
+
+/// File meta we got from object store
+#[derive(Debug)]
+pub struct FileMeta {
+ /// Path of the file
+ pub path: String,
+ /// Last time the file was modified in UTC
+ pub last_modified: Option<chrono::DateTime<Utc>>,
+ /// File size in total
+ pub size: u64,
+}
+
+/// Stream of files get listed from object store
+pub type FileMetaStream =
+ Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
+
+/// Stream of list entries get from object store
+pub type ListEntryStream =
+ Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+#[async_trait]
+pub trait ObjectStore: Sync + Send + Debug {
+ /// Returns all the files in path `prefix`
+ async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
+
+ /// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
+ /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided.
+ async fn list_dir(
+ &self,
+ prefix: &str,
+ delimiter: Option<String>,
+ ) -> Result<ListEntryStream>;
+
+ /// Get object reader for one file
+ fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>>;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+/// A Registry holds all the object stores at runtime with a scheme for each store.
+/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+/// and query data inside these systems.
+pub struct ObjectStoreRegistry {
+ /// A map from scheme to object store that serve list / read operations for the store
+ pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl ObjectStoreRegistry {
+ /// Create the registry that object stores can registered into.
+ /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
+ pub fn new() -> Self {
+ let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+ map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+ Self {
+ object_stores: RwLock::new(map),
+ }
+ }
+
+ /// Adds a new store to this registry.
+ /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+ pub fn register_store(
+ &self,
+ scheme: String,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ let mut stores = self.object_stores.write().unwrap();
+ stores.insert(scheme, store)
+ }
+
+ /// Get the store registered for scheme
+ pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+ let stores = self.object_stores.read().unwrap();
+ stores.get(scheme).cloned()
+ }
+
+ /// Get a suitable store for the URI based on it's scheme. For example:
+ /// URI with scheme file or no schema will return the default LocalFS store,
+ /// URI with scheme s3 will return the S3 store if it's registered.
+ pub fn get_by_uri(&self, uri: &str) -> Result<Arc<dyn ObjectStore>> {
+ if let Some((scheme, _)) = uri.split_once(':') {
+ let stores = self.object_stores.read().unwrap();
+ stores
+ .get(&*scheme.to_lowercase())
+ .map(Clone::clone)
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "No suitable object store found for {}",
+ scheme
+ ))
+ })
+ } else {
+ Ok(Arc::new(LocalFileSystem))
+ }
+ }
+}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index da6de04..cbb2e73 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -49,6 +49,7 @@ use crate::catalog::{
ResolvedTableReference, TableReference,
};
use crate::datasource::csv::CsvFile;
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
@@ -164,6 +165,7 @@ impl ExecutionContext {
aggregate_functions: HashMap::new(),
config,
execution_props: ExecutionProps::new(),
+ object_store_registry: Arc::new(ObjectStoreRegistry::new()),
})),
}
}
@@ -363,6 +365,29 @@ impl ExecutionContext {
self.state.lock().unwrap().catalog_list.catalog(name)
}
+ /// Registers a object store with scheme using a custom `ObjectStore` so that
+ /// an external file system or object storage system could be used against this context.
+ ///
+ /// Returns the `ObjectStore` previously registered for this scheme, if any
+ pub fn register_object_store(
+ &self,
+ scheme: impl Into<String>,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ let scheme = scheme.into();
+
+ self.state
+ .lock()
+ .unwrap()
+ .object_store_registry
+ .register_store(scheme, object_store)
+ }
+
+ /// Retrieves a `ObjectStore` instance by scheme
+ pub fn object_store(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+ self.state.lock().unwrap().object_store_registry.get(scheme)
+ }
+
/// Registers a table using a custom `TableProvider` so that
/// it can be referenced from SQL statements executed against this
/// context.
@@ -849,6 +874,8 @@ pub struct ExecutionContextState {
pub config: ExecutionConfig,
/// Execution properties
pub execution_props: ExecutionProps,
+ /// Object Store that are registered with the context
+ pub object_store_registry: Arc<ObjectStoreRegistry>,
}
impl ExecutionProps {
@@ -876,6 +903,7 @@ impl ExecutionContextState {
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
+ object_store_registry: Arc::new(ObjectStoreRegistry::new()),
}
}