You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/10 17:07:06 UTC

[arrow-datafusion] branch master updated: ObjectStore API to read from remote storage systems (#950)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f53180  ObjectStore API to read from remote storage systems (#950)
6f53180 is described below

commit 6f531807176e49110c33a01722014552024fa412
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Sep 11 01:06:57 2021 +0800

    ObjectStore API to read from remote storage systems (#950)
    
    * Object Store API to read from remote storage systems
    
    * add tokio fs
    
    * resolve comments
    
    * fmt
    
    * resolve comments
    
    * fix test
    
    * rerun
    
    * file_reader async
    
    * add delimiter option in list
    
    * fix fmt
    
    * file_reader to sync
    
    * An optional list_dir api
---
 .../core/src/serde/physical_plan/from_proto.rs     |   2 +
 datafusion/Cargo.toml                              |   2 +-
 datafusion/src/datasource/mod.rs                   |   1 +
 datafusion/src/datasource/object_store/local.rs    | 177 +++++++++++++++++++++
 datafusion/src/datasource/object_store/mod.rs      | 151 ++++++++++++++++++
 datafusion/src/execution/context.rs                |  28 ++++
 6 files changed, 360 insertions(+), 1 deletion(-)

diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 522bac2..6aa0fa1 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -35,6 +35,7 @@ use datafusion::catalog::catalog::{
     CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
 };
 use datafusion::datasource::datasource::Statistics;
+use datafusion::datasource::object_store::ObjectStoreRegistry;
 use datafusion::datasource::FilePartition;
 use datafusion::execution::context::{
     ExecutionConfig, ExecutionContextState, ExecutionProps,
@@ -655,6 +656,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
                     aggregate_functions: Default::default(),
                     config: ExecutionConfig::new(),
                     execution_props: ExecutionProps::new(),
+                    object_store_registry: Arc::new(ObjectStoreRegistry::new()),
                 };
 
                 let fun_expr = functions::create_physical_fun(
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index c9ab943..f30db02 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -58,7 +58,7 @@ chrono = "0.4"
 async-trait = "0.1.41"
 futures = "0.3"
 pin-project-lite= "^0.2.0"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
+tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
 tokio-stream = "0.1"
 log = "^0.4"
 md-5 = { version = "^0.9.1", optional = true }
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index d5e2952..df3328e 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -22,6 +22,7 @@ pub mod datasource;
 pub mod empty;
 pub mod json;
 pub mod memory;
+pub mod object_store;
 pub mod parquet;
 
 pub use self::csv::{CsvFile, CsvReadOptions};
diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs
new file mode 100644
index 0000000..2b27f6c
--- /dev/null
+++ b/datafusion/src/datasource/object_store/local.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object store that represents the Local File System.
+
+use std::fs::Metadata;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use futures::{stream, AsyncRead, StreamExt};
+
+use crate::datasource::object_store::{
+    FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
+};
+use crate::error::DataFusionError;
+use crate::error::Result;
+
+#[derive(Debug)]
+/// Local File System as Object Store.
+pub struct LocalFileSystem;
+
+#[async_trait]
+impl ObjectStore for LocalFileSystem {
+    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
+        list_all(prefix.to_owned()).await
+    }
+
+    async fn list_dir(
+        &self,
+        _prefix: &str,
+        _delimiter: Option<String>,
+    ) -> Result<ListEntryStream> {
+        todo!()
+    }
+
+    fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>> {
+        Ok(Arc::new(LocalFileReader::new(file)?))
+    }
+}
+
+struct LocalFileReader {
+    file: FileMeta,
+}
+
+impl LocalFileReader {
+    fn new(file: FileMeta) -> Result<Self> {
+        Ok(Self { file })
+    }
+}
+
+#[async_trait]
+impl ObjectReader for LocalFileReader {
+    async fn chunk_reader(
+        &self,
+        _start: u64,
+        _length: usize,
+    ) -> Result<Arc<dyn AsyncRead>> {
+        todo!()
+    }
+
+    fn length(&self) -> u64 {
+        self.file.size
+    }
+}
+
+async fn list_all(prefix: String) -> Result<FileMetaStream> {
+    fn get_meta(path: String, metadata: Metadata) -> FileMeta {
+        FileMeta {
+            path,
+            last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
+            size: metadata.len(),
+        }
+    }
+
+    async fn find_files_in_dir(
+        path: String,
+        to_visit: &mut Vec<String>,
+    ) -> Result<Vec<FileMeta>> {
+        let mut dir = tokio::fs::read_dir(path).await?;
+        let mut files = Vec::new();
+
+        while let Some(child) = dir.next_entry().await? {
+            if let Some(child_path) = child.path().to_str() {
+                let metadata = child.metadata().await?;
+                if metadata.is_dir() {
+                    to_visit.push(child_path.to_string());
+                } else {
+                    files.push(get_meta(child_path.to_owned(), metadata))
+                }
+            } else {
+                return Err(DataFusionError::Plan("Invalid path".to_string()));
+            }
+        }
+        Ok(files)
+    }
+
+    let prefix_meta = tokio::fs::metadata(&prefix).await?;
+    let prefix = prefix.to_owned();
+    if prefix_meta.is_file() {
+        Ok(Box::pin(stream::once(async move {
+            Ok(get_meta(prefix, prefix_meta))
+        })))
+    } else {
+        let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
+            match to_visit.pop() {
+                None => None,
+                Some(path) => {
+                    let file_stream = match find_files_in_dir(path, &mut to_visit).await {
+                        Ok(files) => stream::iter(files).map(Ok).left_stream(),
+                        Err(e) => stream::once(async { Err(e) }).right_stream(),
+                    };
+
+                    Some((file_stream, to_visit))
+                }
+            }
+        })
+        .flatten();
+        Ok(Box::pin(result))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::StreamExt;
+    use std::collections::HashSet;
+    use std::fs::create_dir;
+    use std::fs::File;
+    use tempfile::tempdir;
+
+    #[tokio::test]
+    async fn test_recursive_listing() -> Result<()> {
+        // tmp/a.txt
+        // tmp/x/b.txt
+        // tmp/y/c.txt
+        let tmp = tempdir()?;
+        let x_path = tmp.path().join("x");
+        let y_path = tmp.path().join("y");
+        let a_path = tmp.path().join("a.txt");
+        let b_path = x_path.join("b.txt");
+        let c_path = y_path.join("c.txt");
+        create_dir(&x_path)?;
+        create_dir(&y_path)?;
+        File::create(&a_path)?;
+        File::create(&b_path)?;
+        File::create(&c_path)?;
+
+        let mut all_files = HashSet::new();
+        let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
+        while let Some(file) = files.next().await {
+            let file = file?;
+            assert_eq!(file.size, 0);
+            all_files.insert(file.path);
+        }
+
+        assert_eq!(all_files.len(), 3);
+        assert!(all_files.contains(a_path.to_str().unwrap()));
+        assert!(all_files.contains(b_path.to_str().unwrap()));
+        assert!(all_files.contains(c_path.to_str().unwrap()));
+
+        Ok(())
+    }
+}
diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs
new file mode 100644
index 0000000..fd25fd4
--- /dev/null
+++ b/datafusion/src/datasource/object_store/mod.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+pub mod local;
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::{Arc, RwLock};
+
+use async_trait::async_trait;
+use futures::{AsyncRead, Stream};
+
+use local::LocalFileSystem;
+
+use crate::error::{DataFusionError, Result};
+use chrono::Utc;
+
+/// Object Reader for one file in a object store
+#[async_trait]
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file asynchronously
+    async fn chunk_reader(&self, start: u64, length: usize)
+        -> Result<Arc<dyn AsyncRead>>;
+
+    /// Get length for the file
+    fn length(&self) -> u64;
+}
+
+/// Represents a file or a prefix that may require further resolution
+#[derive(Debug)]
+pub enum ListEntry {
+    /// File metadata
+    FileMeta(FileMeta),
+    /// Prefix to be further resolved during partition discovery
+    Prefix(String),
+}
+
+/// File meta we got from object store
+#[derive(Debug)]
+pub struct FileMeta {
+    /// Path of the file
+    pub path: String,
+    /// Last time the file was modified in UTC
+    pub last_modified: Option<chrono::DateTime<Utc>>,
+    /// File size in total
+    pub size: u64,
+}
+
+/// Stream of files get listed from object store
+pub type FileMetaStream =
+    Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
+
+/// Stream of list entries get from object store
+pub type ListEntryStream =
+    Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+#[async_trait]
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns all the files in path `prefix`
+    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
+
+    /// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
+    /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided.
+    async fn list_dir(
+        &self,
+        prefix: &str,
+        delimiter: Option<String>,
+    ) -> Result<ListEntryStream>;
+
+    /// Get object reader for one file
+    fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>>;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+/// A Registry holds all the object stores at runtime with a scheme for each store.
+/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+/// and query data inside these systems.
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write().unwrap();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read().unwrap();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the URI based on it's scheme. For example:
+    /// URI with scheme file or no schema will return the default LocalFS store,
+    /// URI with scheme s3 will return the S3 store if it's registered.
+    pub fn get_by_uri(&self, uri: &str) -> Result<Arc<dyn ObjectStore>> {
+        if let Some((scheme, _)) = uri.split_once(':') {
+            let stores = self.object_stores.read().unwrap();
+            stores
+                .get(&*scheme.to_lowercase())
+                .map(Clone::clone)
+                .ok_or_else(|| {
+                    DataFusionError::Internal(format!(
+                        "No suitable object store found for {}",
+                        scheme
+                    ))
+                })
+        } else {
+            Ok(Arc::new(LocalFileSystem))
+        }
+    }
+}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index da6de04..cbb2e73 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -49,6 +49,7 @@ use crate::catalog::{
     ResolvedTableReference, TableReference,
 };
 use crate::datasource::csv::CsvFile;
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
 use crate::datasource::parquet::ParquetTable;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
@@ -164,6 +165,7 @@ impl ExecutionContext {
                 aggregate_functions: HashMap::new(),
                 config,
                 execution_props: ExecutionProps::new(),
+                object_store_registry: Arc::new(ObjectStoreRegistry::new()),
             })),
         }
     }
@@ -363,6 +365,29 @@ impl ExecutionContext {
         self.state.lock().unwrap().catalog_list.catalog(name)
     }
 
+    /// Registers a object store with scheme using a custom `ObjectStore` so that
+    /// an external file system or object storage system could be used against this context.
+    ///
+    /// Returns the `ObjectStore` previously registered for this scheme, if any
+    pub fn register_object_store(
+        &self,
+        scheme: impl Into<String>,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let scheme = scheme.into();
+
+        self.state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .register_store(scheme, object_store)
+    }
+
+    /// Retrieves a `ObjectStore` instance by scheme
+    pub fn object_store(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        self.state.lock().unwrap().object_store_registry.get(scheme)
+    }
+
     /// Registers a table using a custom `TableProvider` so that
     /// it can be referenced from SQL statements executed against this
     /// context.
@@ -849,6 +874,8 @@ pub struct ExecutionContextState {
     pub config: ExecutionConfig,
     /// Execution properties
     pub execution_props: ExecutionProps,
+    /// Object Store that are registered with the context
+    pub object_store_registry: Arc<ObjectStoreRegistry>,
 }
 
 impl ExecutionProps {
@@ -876,6 +903,7 @@ impl ExecutionContextState {
             aggregate_functions: HashMap::new(),
             config: ExecutionConfig::new(),
             execution_props: ExecutionProps::new(),
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
         }
     }