You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/07/14 12:51:53 UTC
[arrow-datafusion] branch master updated: Remove datafusion-data-access crate (#2904)
This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9401d6d80 Remove datafusion-data-access crate (#2904)
9401d6d80 is described below
commit 9401d6d80bfdd4e02869f7722efdd02a733de272
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 14 20:51:46 2022 +0800
Remove datafusion-data-access crate (#2904)
Co-authored-by: yangzhong <ya...@ebay.com>
---
Cargo.toml | 1 -
datafusion/data-access/Cargo.toml | 41 ---
datafusion/data-access/README.md | 28 --
datafusion/data-access/src/lib.rs | 77 -----
datafusion/data-access/src/object_store/local.rs | 368 -----------------------
datafusion/data-access/src/object_store/mod.rs | 87 ------
6 files changed, 602 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 7a349735b..ab3f427e4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,7 +19,6 @@
members = [
"datafusion/common",
"datafusion/core",
- "datafusion/data-access",
"datafusion/expr",
"datafusion/jit",
"datafusion/optimizer",
diff --git a/datafusion/data-access/Cargo.toml b/datafusion/data-access/Cargo.toml
deleted file mode 100644
index a0ffcb743..000000000
--- a/datafusion/data-access/Cargo.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "datafusion-data-access"
-description = "General data access layer currently mainly based on the object store interfaces"
-version = "10.0.0"
-homepage = "https://github.com/apache/arrow-datafusion"
-repository = "https://github.com/apache/arrow-datafusion"
-readme = "README.md"
-authors = ["Apache Arrow <de...@arrow.apache.org>"]
-license = "Apache-2.0"
-keywords = ["arrow", "query", "sql"]
-edition = "2021"
-rust-version = "1.59"
-
-[lib]
-name = "datafusion_data_access"
-path = "src/lib.rs"
-
-[dependencies]
-async-trait = "0.1.41"
-chrono = { version = "0.4", default-features = false, features = ["std"] }
-futures = "0.3"
-parking_lot = "0.12"
-tempfile = "3"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
diff --git a/datafusion/data-access/README.md b/datafusion/data-access/README.md
deleted file mode 100644
index 526603f69..000000000
--- a/datafusion/data-access/README.md
+++ /dev/null
@@ -1,28 +0,0 @@
-<!---
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# DataFusion Data Access Layer
-
-[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
-
-This crate is a submodule of DataFusion that provides an `async` API for accessing data, either remotely or locally.
-Currently, it is based on the object store interfaces. In the future, this module may include interfaces for accessing
-databases, or streaming data.
-
-[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/data-access/src/lib.rs b/datafusion/data-access/src/lib.rs
deleted file mode 100644
index 6e47be8ae..000000000
--- a/datafusion/data-access/src/lib.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod object_store;
-
-use chrono::{DateTime, Utc};
-use std::{io, result};
-
-/// Result type for operations that could result in an io error
-pub type Result<T> = result::Result<T, io::Error>;
-
-/// Represents a specific file or a prefix (folder) that may
-/// require further resolution
-#[derive(Debug)]
-pub enum ListEntry {
- /// Specific file with metadata
- FileMeta(FileMeta),
- /// Prefix to be further resolved during partition discovery
- Prefix(String),
-}
-
-/// The path and size of the file.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct SizedFile {
- /// Path of the file. It is relative to the current object
- /// store (it does not specify the `xx://` scheme).
- pub path: String,
- /// File size in total
- pub size: u64,
-}
-
-/// Description of a file as returned by the listing command of a
-/// given object store. The resulting path is relative to the
-/// object store that generated it.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct FileMeta {
- /// The path and size of the file.
- pub sized_file: SizedFile,
- /// The last modification time of the file according to the
- /// object store metadata. This information might be used by
- /// catalog systems like Delta Lake for time travel (see
- /// <https://github.com/delta-io/delta/issues/192>)
- pub last_modified: Option<DateTime<Utc>>,
-}
-
-impl FileMeta {
- /// The path that describes this file. It is relative to the
- /// associated object store.
- pub fn path(&self) -> &str {
- &self.sized_file.path
- }
-
- /// The size of the file.
- pub fn size(&self) -> u64 {
- self.sized_file.size
- }
-}
-
-impl std::fmt::Display for FileMeta {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{} (size: {})", self.path(), self.size())
- }
-}
diff --git a/datafusion/data-access/src/object_store/local.rs b/datafusion/data-access/src/object_store/local.rs
deleted file mode 100644
index 4134aa81f..000000000
--- a/datafusion/data-access/src/object_store/local.rs
+++ /dev/null
@@ -1,368 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Object store that represents the Local File System.
-
-use std::fs::{self, File, Metadata};
-use std::io;
-use std::io::{BufReader, Read, Seek, SeekFrom};
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
-
-use crate::{FileMeta, ListEntry, Result, SizedFile};
-
-use super::{
- FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
-};
-
-pub static LOCAL_SCHEME: &str = "file";
-
-#[derive(Debug)]
-/// Local File System as Object Store.
-pub struct LocalFileSystem;
-
-#[async_trait]
-impl ObjectStore for LocalFileSystem {
- async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
- let prefix = if let Some((_scheme, path)) = prefix.split_once("://") {
- path
- } else {
- prefix
- };
- list_all(prefix.to_owned()).await
- }
-
- async fn list_dir(
- &self,
- prefix: &str,
- delimiter: Option<String>,
- ) -> Result<ListEntryStream> {
- if let Some(d) = delimiter {
- if d != "/" && d != "\\" {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- format!("delimiter not supported on local filesystem: {}", d),
- ));
- }
- let mut entry_stream = tokio::fs::read_dir(prefix).await?;
-
- let list_entries = stream::poll_fn(move |cx| {
- entry_stream.poll_next_entry(cx).map(|res| match res {
- Ok(Some(x)) => Some(Ok(x)),
- Ok(None) => None,
- Err(err) => Some(Err(err)),
- })
- })
- .then(|entry| async {
- let entry = entry?;
- let entry = if entry.file_type().await?.is_dir() {
- ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
- } else {
- ListEntry::FileMeta(get_meta(
- path_as_str(&entry.path())?.to_string(),
- entry.metadata().await?,
- ))
- };
- Ok(entry)
- });
-
- Ok(Box::pin(list_entries))
- } else {
- Ok(Box::pin(
- self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
- ))
- }
- }
-
- fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
- Ok(Arc::new(LocalFileReader::new(file)?))
- }
-}
-
-/// Try to convert a PathBuf reference into a &str
-pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
- path.to_str().ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- format!("Invalid path '{}'", path.display()),
- )
- })
-}
-
-struct LocalFileReader {
- file: SizedFile,
-}
-
-impl LocalFileReader {
- fn new(file: SizedFile) -> Result<Self> {
- Ok(Self { file })
- }
-}
-
-#[async_trait]
-impl ObjectReader for LocalFileReader {
- async fn chunk_reader(
- &self,
- _start: u64,
- _length: usize,
- ) -> Result<Box<dyn AsyncRead>> {
- todo!(
- "implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
- )
- }
-
- fn sync_chunk_reader(
- &self,
- start: u64,
- length: usize,
- ) -> Result<Box<dyn Read + Send + Sync>> {
- // A new file descriptor is opened for each chunk reader.
- // This okay because chunks are usually fairly large.
- let mut file = File::open(&self.file.path)?;
- file.seek(SeekFrom::Start(start))?;
-
- let file = BufReader::new(file.take(length as u64));
-
- Ok(Box::new(file))
- }
-
- fn length(&self) -> u64 {
- self.file.size
- }
-}
-
-fn get_meta(path: String, metadata: Metadata) -> FileMeta {
- FileMeta {
- sized_file: SizedFile {
- path,
- size: metadata.len(),
- },
- last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
- }
-}
-
-async fn list_all(prefix: String) -> Result<FileMetaStream> {
- async fn find_files_in_dir(
- path: String,
- to_visit: &mut Vec<String>,
- ) -> Result<Vec<FileMeta>> {
- let mut dir = tokio::fs::read_dir(path).await?;
- let mut files = Vec::new();
-
- while let Some(child) = dir.next_entry().await? {
- let child_path = path_as_str(&child.path())?.to_string();
- let metadata = child.metadata().await?;
- if metadata.is_dir() {
- to_visit.push(child_path.to_string());
- } else {
- files.push(get_meta(child_path.to_owned(), metadata))
- }
- }
- files.sort_by(|a, b| a.path().cmp(b.path()));
- Ok(files)
- }
-
- let prefix_meta = tokio::fs::metadata(&prefix).await?;
- let prefix = prefix.to_owned();
- if prefix_meta.is_file() {
- Ok(Box::pin(stream::once(async move {
- Ok(get_meta(prefix, prefix_meta))
- })))
- } else {
- let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
- match to_visit.pop() {
- None => None,
- Some(path) => {
- let file_stream = match find_files_in_dir(path, &mut to_visit).await {
- Ok(files) => stream::iter(files).map(Ok).left_stream(),
- Err(e) => stream::once(async { Err(e) }).right_stream(),
- };
-
- Some((file_stream, to_visit))
- }
- }
- })
- .flatten();
- Ok(Box::pin(result))
- }
-}
-
-/// Create a stream of `ObjectReader` by converting each file in the `files` vector
-/// into instances of `LocalFileReader`
-pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
- Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f))))
-}
-
-/// Helper method to convert a file location to a `LocalFileReader`
-pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
- LocalFileSystem
- .file_reader(local_unpartitioned_file(file).sized_file)
- .expect("File not found")
-}
-
-/// Helper method to fetch the file size and date at given path and create a `FileMeta`
-pub fn local_unpartitioned_file(file: String) -> FileMeta {
- let metadata = fs::metadata(&file).expect("Local file metadata");
- FileMeta {
- sized_file: SizedFile {
- size: metadata.len(),
- path: file,
- },
- last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::ListEntry;
-
- use super::*;
- use futures::StreamExt;
- use std::collections::HashSet;
- use std::fs::create_dir;
- use std::fs::File;
- use tempfile::tempdir;
-
- #[tokio::test]
- async fn test_recursive_listing() -> Result<()> {
- // tmp/a.txt
- // tmp/x/b.txt
- // tmp/y/c.txt
- let tmp = tempdir()?;
- let x_path = tmp.path().join("x");
- let y_path = tmp.path().join("y");
- let a_path = tmp.path().join("a.txt");
- let b_path = x_path.join("b.txt");
- let c_path = y_path.join("c.txt");
- create_dir(&x_path)?;
- create_dir(&y_path)?;
- File::create(&a_path)?;
- File::create(&b_path)?;
- File::create(&c_path)?;
-
- let mut all_files = HashSet::new();
- let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
- while let Some(file) = files.next().await {
- let file = file?;
- assert_eq!(file.size(), 0);
- all_files.insert(file.path().to_owned());
- }
-
- assert_eq!(all_files.len(), 3);
- assert!(all_files.contains(a_path.to_str().unwrap()));
- assert!(all_files.contains(b_path.to_str().unwrap()));
- assert!(all_files.contains(c_path.to_str().unwrap()));
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_list_dir() -> Result<()> {
- // tmp/a.txt
- // tmp/x/b.txt
- let tmp = tempdir()?;
- let x_path = tmp.path().join("x");
- let a_path = tmp.path().join("a.txt");
- let b_path = x_path.join("b.txt");
- create_dir(&x_path)?;
- File::create(&a_path)?;
- File::create(&b_path)?;
-
- fn get_path(entry: ListEntry) -> String {
- match entry {
- ListEntry::FileMeta(f) => f.sized_file.path,
- ListEntry::Prefix(path) => path,
- }
- }
-
- async fn assert_equal_paths(
- expected: Vec<&std::path::PathBuf>,
- actual: ListEntryStream,
- ) -> Result<()> {
- let expected: HashSet<String> = expected
- .iter()
- .map(|x| x.to_str().unwrap().to_string())
- .collect();
- let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
- assert_eq!(expected, actual);
- Ok(())
- }
-
- // Providing no delimiter means recursive file listing
- let files = LocalFileSystem
- .list_dir(tmp.path().to_str().unwrap(), None)
- .await?;
- assert_equal_paths(vec![&a_path, &b_path], files).await?;
-
- // Providing slash as delimiter means list immediate files and directories
- let files = LocalFileSystem
- .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
- .await?;
- assert_equal_paths(vec![&a_path, &x_path], files).await?;
-
- Ok(())
- }
- #[tokio::test]
- async fn test_list_all_sort_by_filename() -> Result<()> {
- // tmp/file_23590.parquet
- // tmp/file_13690.parquet
- // tmp/file_12590.parquet
- // tmp/file_03590.parquet
- let tmp = tempdir()?;
- let a_path = tmp.path().join("file_23590.parquet");
- let b_path = tmp.path().join("file_13690.parquet");
- let c_path = tmp.path().join("file_12590.parquet");
- let d_path = tmp.path().join("file_03590.parquet");
- File::create(&a_path)?;
- File::create(&b_path)?;
- File::create(&c_path)?;
- File::create(&d_path)?;
-
- let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
- let mut list_files_name = Vec::new();
- while let Some(file) = files.next().await {
- let file = file?;
- list_files_name.push(file.path().to_owned());
- }
- let sort_files_name = [
- tmp.path()
- .join("file_03590.parquet")
- .to_str()
- .unwrap()
- .to_string(),
- tmp.path()
- .join("file_12590.parquet")
- .to_str()
- .unwrap()
- .to_string(),
- tmp.path()
- .join("file_13690.parquet")
- .to_str()
- .unwrap()
- .to_string(),
- tmp.path()
- .join("file_23590.parquet")
- .to_str()
- .unwrap()
- .to_string(),
- ];
- assert_eq!(list_files_name, sort_files_name);
- Ok(())
- }
-}
diff --git a/datafusion/data-access/src/object_store/mod.rs b/datafusion/data-access/src/object_store/mod.rs
deleted file mode 100644
index 496a5494f..000000000
--- a/datafusion/data-access/src/object_store/mod.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Object Store abstracts access to an underlying file/object storage.
-
-pub mod local;
-
-use std::fmt::Debug;
-use std::io::Read;
-use std::pin::Pin;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use futures::{AsyncRead, Stream};
-
-use crate::{FileMeta, ListEntry, Result, SizedFile};
-
-/// Stream of files listed from object store
-pub type FileMetaStream =
- Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
-
-/// Stream of list entries obtained from object store
-pub type ListEntryStream =
- Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
-
-/// Stream readers opened on a given object store
-pub type ObjectReaderStream =
- Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>;
-
-/// Object Reader for one file in an object store.
-///
-/// Note that the dynamic dispatch on the reader might
-/// have some performance impacts.
-#[async_trait]
-pub trait ObjectReader: Send + Sync {
- /// Get reader for a part [start, start + length] in the file asynchronously
- async fn chunk_reader(&self, start: u64, length: usize)
- -> Result<Box<dyn AsyncRead>>;
-
- /// Get reader for a part [start, start + length] in the file
- fn sync_chunk_reader(
- &self,
- start: u64,
- length: usize,
- ) -> Result<Box<dyn Read + Send + Sync>>;
-
- /// Get reader for the entire file
- fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
- self.sync_chunk_reader(0, self.length() as usize)
- }
-
- /// Get the size of the file
- fn length(&self) -> u64;
-}
-
-/// A ObjectStore abstracts access to an underlying file/object storage.
-/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
-#[async_trait]
-pub trait ObjectStore: Sync + Send + Debug {
- /// Returns all the files in path `prefix`
- async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
-
- /// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
- /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided.
- async fn list_dir(
- &self,
- prefix: &str,
- delimiter: Option<String>,
- ) -> Result<ListEntryStream>;
-
- /// Get object reader for one file
- fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
-}