You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/06 16:59:14 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2467: Add support for list_dir() on local fs

alamb commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r867009509


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {

Review Comment:
   I think you may be able to use https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html here to `collect` the stream
   
   Maybe something like (untested):
   
   ```rust
   let list_entries = entry_stream
     .collect::<Result<Vec<_>>>()
     .await?
     .then(...);
   ```



##########
data-access/src/object_store/local.rs:
##########
@@ -231,6 +273,53 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_dir() -> Result<()> {
+        // tmp/a.txt
+        // tmp/x/b.txt
+        let tmp = tempdir()?;
+        let x_path = tmp.path().join("x");
+        let a_path = tmp.path().join("a.txt");
+        let b_path = x_path.join("b.txt");
+        create_dir(&x_path)?;
+        File::create(&a_path)?;
+        File::create(&b_path)?;
+
+        fn get_path(entry: ListEntry) -> String {
+            match entry {
+                ListEntry::FileMeta(f) => f.sized_file.path,
+                ListEntry::Prefix(path) => path,
+            }
+        }
+
+        async fn assert_equal_paths(
+            expected: Vec<&std::path::PathBuf>,
+            actual: ListEntryStream,
+        ) -> Result<()> {
+            let expected: HashSet<String> = expected

Review Comment:
   I think the comparison should also include the order -- if you use `HashSet` than paths like `/foo/bar` and `/bar/foo` will look they are equal, right?



##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
         Ok(Arc::new(LocalFileReader::new(file)?))
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    if let Some(path) = path.to_str() {
+        Ok(path)
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Invalid path: {}", path.display()),
+        ))
+    }

Review Comment:
   You can write this more "idiomatically" like:
   
   ```suggestion
       path.to_str()
         .map_err(|e| io::Error::new(
               io::ErrorKind::InvalidInput,
               format!("Invalid path '{}': {}", path.display(), e),
           )))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org