You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/09 18:03:43 UTC

[arrow-datafusion] branch master updated: Add support for list_dir() on local fs (#2467)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b53bf8a34 Add support for list_dir() on local fs (#2467)
b53bf8a34 is described below

commit b53bf8a34a6484f3b0502efb1ba4d9399324a9a9
Author: Will Jones <wi...@gmail.com>
AuthorDate: Mon May 9 11:03:35 2022 -0700

    Add support for list_dir() on local fs (#2467)
    
    * Add support for list_dir on local fs
    
    * Format
    
    * Print invalid values in errors
    
    * Update data-access/src/object_store/local.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Use ok_or_else
    
    * Format
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 data-access/Cargo.toml                |   2 +-
 data-access/src/object_store/local.rs | 137 +++++++++++++++++++++++++++-------
 2 files changed, 113 insertions(+), 26 deletions(-)

diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml
index 951d74167..83b2ae883 100644
--- a/data-access/Cargo.toml
+++ b/data-access/Cargo.toml
@@ -34,7 +34,7 @@ path = "src/lib.rs"
 
 [dependencies]
 async-trait = "0.1.41"
-chrono = { version = "0.4", default-features = false }
+chrono = { version = "0.4", default-features = false, features = ["std"] }
 futures = "0.3"
 glob = "0.3.0"
 parking_lot = "0.12"
diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs
index 118f20564..604539814 100644
--- a/data-access/src/object_store/local.rs
+++ b/data-access/src/object_store/local.rs
@@ -23,9 +23,9 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use futures::{stream, AsyncRead, StreamExt};
+use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
 
-use crate::{FileMeta, Result, SizedFile};
+use crate::{FileMeta, ListEntry, Result, SizedFile};
 
 use super::{
     FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
@@ -50,10 +50,44 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
@@ -61,6 +95,16 @@ impl ObjectStore for LocalFileSystem {
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    path.to_str().ok_or_else(|| {
+        io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Invalid path '{}'", path.display()),
+        )
+    })
+}
+
 struct LocalFileReader {
     file: SizedFile,
 }
@@ -103,17 +147,17 @@ impl ObjectReader for LocalFileReader {
     }
 }
 
-async fn list_all(prefix: String) -> Result<FileMetaStream> {
-    fn get_meta(path: String, metadata: Metadata) -> FileMeta {
-        FileMeta {
-            sized_file: SizedFile {
-                path,
-                size: metadata.len(),
-            },
-            last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
-        }
+fn get_meta(path: String, metadata: Metadata) -> FileMeta {
+    FileMeta {
+        sized_file: SizedFile {
+            path,
+            size: metadata.len(),
+        },
+        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
     }
+}
 
+async fn list_all(prefix: String) -> Result<FileMetaStream> {
     async fn find_files_in_dir(
         path: String,
         to_visit: &mut Vec<String>,
@@ -122,18 +166,12 @@ async fn list_all(prefix: String) -> Result<FileMetaStream> {
         let mut files = Vec::new();
 
         while let Some(child) = dir.next_entry().await? {
-            if let Some(child_path) = child.path().to_str() {
-                let metadata = child.metadata().await?;
-                if metadata.is_dir() {
-                    to_visit.push(child_path.to_string());
-                } else {
-                    files.push(get_meta(child_path.to_owned(), metadata))
-                }
+            let child_path = path_as_str(&child.path())?.to_string();
+            let metadata = child.metadata().await?;
+            if metadata.is_dir() {
+                to_visit.push(child_path.to_string());
             } else {
-                return Err(io::Error::new(
-                    io::ErrorKind::InvalidInput,
-                    "Invalid path".to_string(),
-                ));
+                files.push(get_meta(child_path.to_owned(), metadata))
             }
         }
         Ok(files)
@@ -191,6 +229,8 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta {
 
 #[cfg(test)]
 mod tests {
+    use crate::ListEntry;
+
     use super::*;
     use futures::StreamExt;
     use std::collections::HashSet;
@@ -231,6 +271,53 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_dir() -> Result<()> {
+        // tmp/a.txt
+        // tmp/x/b.txt
+        let tmp = tempdir()?;
+        let x_path = tmp.path().join("x");
+        let a_path = tmp.path().join("a.txt");
+        let b_path = x_path.join("b.txt");
+        create_dir(&x_path)?;
+        File::create(&a_path)?;
+        File::create(&b_path)?;
+
+        fn get_path(entry: ListEntry) -> String {
+            match entry {
+                ListEntry::FileMeta(f) => f.sized_file.path,
+                ListEntry::Prefix(path) => path,
+            }
+        }
+
+        async fn assert_equal_paths(
+            expected: Vec<&std::path::PathBuf>,
+            actual: ListEntryStream,
+        ) -> Result<()> {
+            let expected: HashSet<String> = expected
+                .iter()
+                .map(|x| x.to_str().unwrap().to_string())
+                .collect();
+            let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
+            assert_eq!(expected, actual);
+            Ok(())
+        }
+
+        // Providing no delimiter means recursive file listing
+        let files = LocalFileSystem
+            .list_dir(tmp.path().to_str().unwrap(), None)
+            .await?;
+        assert_equal_paths(vec![&a_path, &b_path], files).await?;
+
+        // Providing slash as delimiter means list immediate files and directories
+        let files = LocalFileSystem
+            .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
+            .await?;
+        assert_equal_paths(vec![&a_path, &x_path], files).await?;
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_globbing() -> Result<()> {
         let tmp = tempdir()?;