You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/09 18:03:43 UTC
[arrow-datafusion] branch master updated: Add support for list_dir() on local fs (#2467)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b53bf8a34 Add support for list_dir() on local fs (#2467)
b53bf8a34 is described below
commit b53bf8a34a6484f3b0502efb1ba4d9399324a9a9
Author: Will Jones <wi...@gmail.com>
AuthorDate: Mon May 9 11:03:35 2022 -0700
Add support for list_dir() on local fs (#2467)
* Add support for list_dir on local fs
* Format
* Print invalid values in errors
* Update data-access/src/object_store/local.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Use ok_or_else
* Format
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
data-access/Cargo.toml | 2 +-
data-access/src/object_store/local.rs | 137 +++++++++++++++++++++++++++-------
2 files changed, 113 insertions(+), 26 deletions(-)
diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml
index 951d74167..83b2ae883 100644
--- a/data-access/Cargo.toml
+++ b/data-access/Cargo.toml
@@ -34,7 +34,7 @@ path = "src/lib.rs"
[dependencies]
async-trait = "0.1.41"
-chrono = { version = "0.4", default-features = false }
+chrono = { version = "0.4", default-features = false, features = ["std"] }
futures = "0.3"
glob = "0.3.0"
parking_lot = "0.12"
diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs
index 118f20564..604539814 100644
--- a/data-access/src/object_store/local.rs
+++ b/data-access/src/object_store/local.rs
@@ -23,9 +23,9 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt};
+use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
-use crate::{FileMeta, Result, SizedFile};
+use crate::{FileMeta, ListEntry, Result, SizedFile};
use super::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
@@ -50,10 +50,44 @@ impl ObjectStore for LocalFileSystem {
async fn list_dir(
&self,
- _prefix: &str,
- _delimiter: Option<String>,
+ prefix: &str,
+ delimiter: Option<String>,
) -> Result<ListEntryStream> {
- todo!()
+ if let Some(d) = delimiter {
+ if d != "/" && d != "\\" {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ format!("delimiter not supported on local filesystem: {}", d),
+ ));
+ }
+ let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+ let list_entries = stream::poll_fn(move |cx| {
+ entry_stream.poll_next_entry(cx).map(|res| match res {
+ Ok(Some(x)) => Some(Ok(x)),
+ Ok(None) => None,
+ Err(err) => Some(Err(err)),
+ })
+ })
+ .then(|entry| async {
+ let entry = entry?;
+ let entry = if entry.file_type().await?.is_dir() {
+ ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+ } else {
+ ListEntry::FileMeta(get_meta(
+ path_as_str(&entry.path())?.to_string(),
+ entry.metadata().await?,
+ ))
+ };
+ Ok(entry)
+ });
+
+ Ok(Box::pin(list_entries))
+ } else {
+ Ok(Box::pin(
+ self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+ ))
+ }
}
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
@@ -61,6 +95,16 @@ impl ObjectStore for LocalFileSystem {
}
}
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+ path.to_str().ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Invalid path '{}'", path.display()),
+ )
+ })
+}
+
struct LocalFileReader {
file: SizedFile,
}
@@ -103,17 +147,17 @@ impl ObjectReader for LocalFileReader {
}
}
-async fn list_all(prefix: String) -> Result<FileMetaStream> {
- fn get_meta(path: String, metadata: Metadata) -> FileMeta {
- FileMeta {
- sized_file: SizedFile {
- path,
- size: metadata.len(),
- },
- last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
- }
+fn get_meta(path: String, metadata: Metadata) -> FileMeta {
+ FileMeta {
+ sized_file: SizedFile {
+ path,
+ size: metadata.len(),
+ },
+ last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
}
+}
+async fn list_all(prefix: String) -> Result<FileMetaStream> {
async fn find_files_in_dir(
path: String,
to_visit: &mut Vec<String>,
@@ -122,18 +166,12 @@ async fn list_all(prefix: String) -> Result<FileMetaStream> {
let mut files = Vec::new();
while let Some(child) = dir.next_entry().await? {
- if let Some(child_path) = child.path().to_str() {
- let metadata = child.metadata().await?;
- if metadata.is_dir() {
- to_visit.push(child_path.to_string());
- } else {
- files.push(get_meta(child_path.to_owned(), metadata))
- }
+ let child_path = path_as_str(&child.path())?.to_string();
+ let metadata = child.metadata().await?;
+ if metadata.is_dir() {
+ to_visit.push(child_path.to_string());
} else {
- return Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- "Invalid path".to_string(),
- ));
+ files.push(get_meta(child_path.to_owned(), metadata))
}
}
Ok(files)
@@ -191,6 +229,8 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta {
#[cfg(test)]
mod tests {
+ use crate::ListEntry;
+
use super::*;
use futures::StreamExt;
use std::collections::HashSet;
@@ -231,6 +271,53 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_list_dir() -> Result<()> {
+ // tmp/a.txt
+ // tmp/x/b.txt
+ let tmp = tempdir()?;
+ let x_path = tmp.path().join("x");
+ let a_path = tmp.path().join("a.txt");
+ let b_path = x_path.join("b.txt");
+ create_dir(&x_path)?;
+ File::create(&a_path)?;
+ File::create(&b_path)?;
+
+ fn get_path(entry: ListEntry) -> String {
+ match entry {
+ ListEntry::FileMeta(f) => f.sized_file.path,
+ ListEntry::Prefix(path) => path,
+ }
+ }
+
+ async fn assert_equal_paths(
+ expected: Vec<&std::path::PathBuf>,
+ actual: ListEntryStream,
+ ) -> Result<()> {
+ let expected: HashSet<String> = expected
+ .iter()
+ .map(|x| x.to_str().unwrap().to_string())
+ .collect();
+ let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
+ assert_eq!(expected, actual);
+ Ok(())
+ }
+
+ // Providing no delimiter means recursive file listing
+ let files = LocalFileSystem
+ .list_dir(tmp.path().to_str().unwrap(), None)
+ .await?;
+ assert_equal_paths(vec![&a_path, &b_path], files).await?;
+
+ // Providing slash as delimiter means list immediate files and directories
+ let files = LocalFileSystem
+ .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
+ .await?;
+ assert_equal_paths(vec![&a_path, &x_path], files).await?;
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_globbing() -> Result<()> {
let tmp = tempdir()?;