You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/07/14 12:51:53 UTC

[arrow-datafusion] branch master updated: Remove datafusion-data-access crate (#2904)

This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9401d6d80 Remove datafusion-data-access crate (#2904)
9401d6d80 is described below

commit 9401d6d80bfdd4e02869f7722efdd02a733de272
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 14 20:51:46 2022 +0800

    Remove datafusion-data-access crate (#2904)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 Cargo.toml                                       |   1 -
 datafusion/data-access/Cargo.toml                |  41 ---
 datafusion/data-access/README.md                 |  28 --
 datafusion/data-access/src/lib.rs                |  77 -----
 datafusion/data-access/src/object_store/local.rs | 368 -----------------------
 datafusion/data-access/src/object_store/mod.rs   |  87 ------
 6 files changed, 602 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 7a349735b..ab3f427e4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,7 +19,6 @@
 members = [
     "datafusion/common",
     "datafusion/core",
-    "datafusion/data-access",
     "datafusion/expr",
     "datafusion/jit",
     "datafusion/optimizer",
diff --git a/datafusion/data-access/Cargo.toml b/datafusion/data-access/Cargo.toml
deleted file mode 100644
index a0ffcb743..000000000
--- a/datafusion/data-access/Cargo.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "datafusion-data-access"
-description = "General data access layer currently mainly based on the object store interfaces"
-version = "10.0.0"
-homepage = "https://github.com/apache/arrow-datafusion"
-repository = "https://github.com/apache/arrow-datafusion"
-readme = "README.md"
-authors = ["Apache Arrow <de...@arrow.apache.org>"]
-license = "Apache-2.0"
-keywords = ["arrow", "query", "sql"]
-edition = "2021"
-rust-version = "1.59"
-
-[lib]
-name = "datafusion_data_access"
-path = "src/lib.rs"
-
-[dependencies]
-async-trait = "0.1.41"
-chrono = { version = "0.4", default-features = false, features = ["std"] }
-futures = "0.3"
-parking_lot = "0.12"
-tempfile = "3"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
diff --git a/datafusion/data-access/README.md b/datafusion/data-access/README.md
deleted file mode 100644
index 526603f69..000000000
--- a/datafusion/data-access/README.md
+++ /dev/null
@@ -1,28 +0,0 @@
-<!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-
-# DataFusion Data Access Layer
-
-[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
-
-This crate is a submodule of DataFusion that provides an `async` API for accessing data, either remotely or locally.
-Currently, it is based on the object store interfaces. In the future, this module may include interfaces for accessing
-databases, or streaming data.
-
-[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/data-access/src/lib.rs b/datafusion/data-access/src/lib.rs
deleted file mode 100644
index 6e47be8ae..000000000
--- a/datafusion/data-access/src/lib.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod object_store;
-
-use chrono::{DateTime, Utc};
-use std::{io, result};
-
-/// Result type for operations that could result in an io error
-pub type Result<T> = result::Result<T, io::Error>;
-
-/// Represents a specific file or a prefix (folder) that may
-/// require further resolution
-#[derive(Debug)]
-pub enum ListEntry {
-    /// Specific file with metadata
-    FileMeta(FileMeta),
-    /// Prefix to be further resolved during partition discovery
-    Prefix(String),
-}
-
-/// The path and size of the file.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct SizedFile {
-    /// Path of the file. It is relative to the current object
-    /// store (it does not specify the `xx://` scheme).
-    pub path: String,
-    /// File size in total
-    pub size: u64,
-}
-
-/// Description of a file as returned by the listing command of a
-/// given object store. The resulting path is relative to the
-/// object store that generated it.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct FileMeta {
-    /// The path and size of the file.
-    pub sized_file: SizedFile,
-    /// The last modification time of the file according to the
-    /// object store metadata. This information might be used by
-    /// catalog systems like Delta Lake for time travel (see
-    /// <https://github.com/delta-io/delta/issues/192>)
-    pub last_modified: Option<DateTime<Utc>>,
-}
-
-impl FileMeta {
-    /// The path that describes this file. It is relative to the
-    /// associated object store.
-    pub fn path(&self) -> &str {
-        &self.sized_file.path
-    }
-
-    /// The size of the file.
-    pub fn size(&self) -> u64 {
-        self.sized_file.size
-    }
-}
-
-impl std::fmt::Display for FileMeta {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{} (size: {})", self.path(), self.size())
-    }
-}
diff --git a/datafusion/data-access/src/object_store/local.rs b/datafusion/data-access/src/object_store/local.rs
deleted file mode 100644
index 4134aa81f..000000000
--- a/datafusion/data-access/src/object_store/local.rs
+++ /dev/null
@@ -1,368 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Object store that represents the Local File System.
-
-use std::fs::{self, File, Metadata};
-use std::io;
-use std::io::{BufReader, Read, Seek, SeekFrom};
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
-
-use crate::{FileMeta, ListEntry, Result, SizedFile};
-
-use super::{
-    FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
-};
-
-pub static LOCAL_SCHEME: &str = "file";
-
-#[derive(Debug)]
-/// Local File System as Object Store.
-pub struct LocalFileSystem;
-
-#[async_trait]
-impl ObjectStore for LocalFileSystem {
-    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
-        let prefix = if let Some((_scheme, path)) = prefix.split_once("://") {
-            path
-        } else {
-            prefix
-        };
-        list_all(prefix.to_owned()).await
-    }
-
-    async fn list_dir(
-        &self,
-        prefix: &str,
-        delimiter: Option<String>,
-    ) -> Result<ListEntryStream> {
-        if let Some(d) = delimiter {
-            if d != "/" && d != "\\" {
-                return Err(std::io::Error::new(
-                    std::io::ErrorKind::InvalidInput,
-                    format!("delimiter not supported on local filesystem: {}", d),
-                ));
-            }
-            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
-
-            let list_entries = stream::poll_fn(move |cx| {
-                entry_stream.poll_next_entry(cx).map(|res| match res {
-                    Ok(Some(x)) => Some(Ok(x)),
-                    Ok(None) => None,
-                    Err(err) => Some(Err(err)),
-                })
-            })
-            .then(|entry| async {
-                let entry = entry?;
-                let entry = if entry.file_type().await?.is_dir() {
-                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
-                } else {
-                    ListEntry::FileMeta(get_meta(
-                        path_as_str(&entry.path())?.to_string(),
-                        entry.metadata().await?,
-                    ))
-                };
-                Ok(entry)
-            });
-
-            Ok(Box::pin(list_entries))
-        } else {
-            Ok(Box::pin(
-                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
-            ))
-        }
-    }
-
-    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
-        Ok(Arc::new(LocalFileReader::new(file)?))
-    }
-}
-
-/// Try to convert a PathBuf reference into a &str
-pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
-    path.to_str().ok_or_else(|| {
-        io::Error::new(
-            io::ErrorKind::InvalidInput,
-            format!("Invalid path '{}'", path.display()),
-        )
-    })
-}
-
-struct LocalFileReader {
-    file: SizedFile,
-}
-
-impl LocalFileReader {
-    fn new(file: SizedFile) -> Result<Self> {
-        Ok(Self { file })
-    }
-}
-
-#[async_trait]
-impl ObjectReader for LocalFileReader {
-    async fn chunk_reader(
-        &self,
-        _start: u64,
-        _length: usize,
-    ) -> Result<Box<dyn AsyncRead>> {
-        todo!(
-            "implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
-        )
-    }
-
-    fn sync_chunk_reader(
-        &self,
-        start: u64,
-        length: usize,
-    ) -> Result<Box<dyn Read + Send + Sync>> {
-        // A new file descriptor is opened for each chunk reader.
-        // This okay because chunks are usually fairly large.
-        let mut file = File::open(&self.file.path)?;
-        file.seek(SeekFrom::Start(start))?;
-
-        let file = BufReader::new(file.take(length as u64));
-
-        Ok(Box::new(file))
-    }
-
-    fn length(&self) -> u64 {
-        self.file.size
-    }
-}
-
-fn get_meta(path: String, metadata: Metadata) -> FileMeta {
-    FileMeta {
-        sized_file: SizedFile {
-            path,
-            size: metadata.len(),
-        },
-        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
-    }
-}
-
-async fn list_all(prefix: String) -> Result<FileMetaStream> {
-    async fn find_files_in_dir(
-        path: String,
-        to_visit: &mut Vec<String>,
-    ) -> Result<Vec<FileMeta>> {
-        let mut dir = tokio::fs::read_dir(path).await?;
-        let mut files = Vec::new();
-
-        while let Some(child) = dir.next_entry().await? {
-            let child_path = path_as_str(&child.path())?.to_string();
-            let metadata = child.metadata().await?;
-            if metadata.is_dir() {
-                to_visit.push(child_path.to_string());
-            } else {
-                files.push(get_meta(child_path.to_owned(), metadata))
-            }
-        }
-        files.sort_by(|a, b| a.path().cmp(b.path()));
-        Ok(files)
-    }
-
-    let prefix_meta = tokio::fs::metadata(&prefix).await?;
-    let prefix = prefix.to_owned();
-    if prefix_meta.is_file() {
-        Ok(Box::pin(stream::once(async move {
-            Ok(get_meta(prefix, prefix_meta))
-        })))
-    } else {
-        let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
-            match to_visit.pop() {
-                None => None,
-                Some(path) => {
-                    let file_stream = match find_files_in_dir(path, &mut to_visit).await {
-                        Ok(files) => stream::iter(files).map(Ok).left_stream(),
-                        Err(e) => stream::once(async { Err(e) }).right_stream(),
-                    };
-
-                    Some((file_stream, to_visit))
-                }
-            }
-        })
-        .flatten();
-        Ok(Box::pin(result))
-    }
-}
-
-/// Create a stream of `ObjectReader` by converting each file in the `files` vector
-/// into instances of `LocalFileReader`
-pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
-    Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f))))
-}
-
-/// Helper method to convert a file location to a `LocalFileReader`
-pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
-    LocalFileSystem
-        .file_reader(local_unpartitioned_file(file).sized_file)
-        .expect("File not found")
-}
-
-/// Helper method to fetch the file size and date at given path and create a `FileMeta`
-pub fn local_unpartitioned_file(file: String) -> FileMeta {
-    let metadata = fs::metadata(&file).expect("Local file metadata");
-    FileMeta {
-        sized_file: SizedFile {
-            size: metadata.len(),
-            path: file,
-        },
-        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::ListEntry;
-
-    use super::*;
-    use futures::StreamExt;
-    use std::collections::HashSet;
-    use std::fs::create_dir;
-    use std::fs::File;
-    use tempfile::tempdir;
-
-    #[tokio::test]
-    async fn test_recursive_listing() -> Result<()> {
-        // tmp/a.txt
-        // tmp/x/b.txt
-        // tmp/y/c.txt
-        let tmp = tempdir()?;
-        let x_path = tmp.path().join("x");
-        let y_path = tmp.path().join("y");
-        let a_path = tmp.path().join("a.txt");
-        let b_path = x_path.join("b.txt");
-        let c_path = y_path.join("c.txt");
-        create_dir(&x_path)?;
-        create_dir(&y_path)?;
-        File::create(&a_path)?;
-        File::create(&b_path)?;
-        File::create(&c_path)?;
-
-        let mut all_files = HashSet::new();
-        let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
-        while let Some(file) = files.next().await {
-            let file = file?;
-            assert_eq!(file.size(), 0);
-            all_files.insert(file.path().to_owned());
-        }
-
-        assert_eq!(all_files.len(), 3);
-        assert!(all_files.contains(a_path.to_str().unwrap()));
-        assert!(all_files.contains(b_path.to_str().unwrap()));
-        assert!(all_files.contains(c_path.to_str().unwrap()));
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_list_dir() -> Result<()> {
-        // tmp/a.txt
-        // tmp/x/b.txt
-        let tmp = tempdir()?;
-        let x_path = tmp.path().join("x");
-        let a_path = tmp.path().join("a.txt");
-        let b_path = x_path.join("b.txt");
-        create_dir(&x_path)?;
-        File::create(&a_path)?;
-        File::create(&b_path)?;
-
-        fn get_path(entry: ListEntry) -> String {
-            match entry {
-                ListEntry::FileMeta(f) => f.sized_file.path,
-                ListEntry::Prefix(path) => path,
-            }
-        }
-
-        async fn assert_equal_paths(
-            expected: Vec<&std::path::PathBuf>,
-            actual: ListEntryStream,
-        ) -> Result<()> {
-            let expected: HashSet<String> = expected
-                .iter()
-                .map(|x| x.to_str().unwrap().to_string())
-                .collect();
-            let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
-            assert_eq!(expected, actual);
-            Ok(())
-        }
-
-        // Providing no delimiter means recursive file listing
-        let files = LocalFileSystem
-            .list_dir(tmp.path().to_str().unwrap(), None)
-            .await?;
-        assert_equal_paths(vec![&a_path, &b_path], files).await?;
-
-        // Providing slash as delimiter means list immediate files and directories
-        let files = LocalFileSystem
-            .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
-            .await?;
-        assert_equal_paths(vec![&a_path, &x_path], files).await?;
-
-        Ok(())
-    }
-    #[tokio::test]
-    async fn test_list_all_sort_by_filename() -> Result<()> {
-        // tmp/file_23590.parquet
-        // tmp/file_13690.parquet
-        // tmp/file_12590.parquet
-        // tmp/file_03590.parquet
-        let tmp = tempdir()?;
-        let a_path = tmp.path().join("file_23590.parquet");
-        let b_path = tmp.path().join("file_13690.parquet");
-        let c_path = tmp.path().join("file_12590.parquet");
-        let d_path = tmp.path().join("file_03590.parquet");
-        File::create(&a_path)?;
-        File::create(&b_path)?;
-        File::create(&c_path)?;
-        File::create(&d_path)?;
-
-        let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
-        let mut list_files_name = Vec::new();
-        while let Some(file) = files.next().await {
-            let file = file?;
-            list_files_name.push(file.path().to_owned());
-        }
-        let sort_files_name = [
-            tmp.path()
-                .join("file_03590.parquet")
-                .to_str()
-                .unwrap()
-                .to_string(),
-            tmp.path()
-                .join("file_12590.parquet")
-                .to_str()
-                .unwrap()
-                .to_string(),
-            tmp.path()
-                .join("file_13690.parquet")
-                .to_str()
-                .unwrap()
-                .to_string(),
-            tmp.path()
-                .join("file_23590.parquet")
-                .to_str()
-                .unwrap()
-                .to_string(),
-        ];
-        assert_eq!(list_files_name, sort_files_name);
-        Ok(())
-    }
-}
diff --git a/datafusion/data-access/src/object_store/mod.rs b/datafusion/data-access/src/object_store/mod.rs
deleted file mode 100644
index 496a5494f..000000000
--- a/datafusion/data-access/src/object_store/mod.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Object Store abstracts access to an underlying file/object storage.
-
-pub mod local;
-
-use std::fmt::Debug;
-use std::io::Read;
-use std::pin::Pin;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use futures::{AsyncRead, Stream};
-
-use crate::{FileMeta, ListEntry, Result, SizedFile};
-
-/// Stream of files listed from object store
-pub type FileMetaStream =
-    Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
-
-/// Stream of list entries obtained from object store
-pub type ListEntryStream =
-    Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
-
-/// Stream readers opened on a given object store
-pub type ObjectReaderStream =
-    Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>;
-
-/// Object Reader for one file in an object store.
-///
-/// Note that the dynamic dispatch on the reader might
-/// have some performance impacts.
-#[async_trait]
-pub trait ObjectReader: Send + Sync {
-    /// Get reader for a part [start, start + length] in the file asynchronously
-    async fn chunk_reader(&self, start: u64, length: usize)
-        -> Result<Box<dyn AsyncRead>>;
-
-    /// Get reader for a part [start, start + length] in the file
-    fn sync_chunk_reader(
-        &self,
-        start: u64,
-        length: usize,
-    ) -> Result<Box<dyn Read + Send + Sync>>;
-
-    /// Get reader for the entire file
-    fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
-        self.sync_chunk_reader(0, self.length() as usize)
-    }
-
-    /// Get the size of the file
-    fn length(&self) -> u64;
-}
-
-/// A ObjectStore abstracts access to an underlying file/object storage.
-/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
-#[async_trait]
-pub trait ObjectStore: Sync + Send + Debug {
-    /// Returns all the files in path `prefix`
-    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
-
-    /// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
-    /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided.
-    async fn list_dir(
-        &self,
-        prefix: &str,
-        delimiter: Option<String>,
-    ) -> Result<ListEntryStream>;
-
-    /// Get object reader for one file
-    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
-}