You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/06 02:26:52 UTC

[GitHub] [arrow-datafusion] wjones127 opened a new pull request, #2467: Add support for list_dir() on local fs

wjones127 opened a new pull request, #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467

   # Which issue does this PR close?
   
   Closes #2466.
   
    # Rationale for this change
   
   Simply implementing this for localfs. I don't know of any other ObjectStore that does implement this yet (it's also left as TODO for S3 and HDFS), but it seems like it will be useful.
   
   # What changes are included in this PR?
   
   * Adds implementation of `list_dir()` for `LocalFileSystem`, with associated tests.
   * Adds a convenience function `path_to_str()` which was helpful internally, but I also added to public API since it's likely helpful for users as well.
   
   # Are there any user-facing changes?
   
   All user-facing changes are new functionality; nothing breaks existing functionality.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] wjones127 commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
wjones127 commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r866455994


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "delimiter not supported on local filesystem",
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
         Ok(Arc::new(LocalFileReader::new(file)?))
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    if let Some(child_path) = path.to_str() {
+        Ok(child_path)
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "Invalid path".to_string(),

Review Comment:
   Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] wjones127 commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
wjones127 commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r867415984


##########
data-access/src/object_store/local.rs:
##########
@@ -231,6 +273,53 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_dir() -> Result<()> {
+        // tmp/a.txt
+        // tmp/x/b.txt
+        let tmp = tempdir()?;
+        let x_path = tmp.path().join("x");
+        let a_path = tmp.path().join("a.txt");
+        let b_path = x_path.join("b.txt");
+        create_dir(&x_path)?;
+        File::create(&a_path)?;
+        File::create(&b_path)?;
+
+        fn get_path(entry: ListEntry) -> String {
+            match entry {
+                ListEntry::FileMeta(f) => f.sized_file.path,
+                ListEntry::Prefix(path) => path,
+            }
+        }
+
+        async fn assert_equal_paths(
+            expected: Vec<&std::path::PathBuf>,
+            actual: ListEntryStream,
+        ) -> Result<()> {
+            let expected: HashSet<String> = expected

Review Comment:
   Sorry, maybe this function name is misleading; it's comparing sets of paths not a single path. So the order within a path is preserved, but not the order those paths may be listed in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r866453626


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "delimiter not supported on local filesystem",

Review Comment:
   Also would be nice to include the unsupported delimiter in this message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r867009509


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {

Review Comment:
   I think you may be able to use https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html here to `collect` the stream
   
   Maybe something like (untested):
   
   ```rust
   let list_entries = entry_stream
     .collect::<Result<Vec<_>>>()
     .await?
     .then(...);
   ```



##########
data-access/src/object_store/local.rs:
##########
@@ -231,6 +273,53 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_dir() -> Result<()> {
+        // tmp/a.txt
+        // tmp/x/b.txt
+        let tmp = tempdir()?;
+        let x_path = tmp.path().join("x");
+        let a_path = tmp.path().join("a.txt");
+        let b_path = x_path.join("b.txt");
+        create_dir(&x_path)?;
+        File::create(&a_path)?;
+        File::create(&b_path)?;
+
+        fn get_path(entry: ListEntry) -> String {
+            match entry {
+                ListEntry::FileMeta(f) => f.sized_file.path,
+                ListEntry::Prefix(path) => path,
+            }
+        }
+
+        async fn assert_equal_paths(
+            expected: Vec<&std::path::PathBuf>,
+            actual: ListEntryStream,
+        ) -> Result<()> {
+            let expected: HashSet<String> = expected

Review Comment:
   I think the comparison should also include the order -- if you use `HashSet` than paths like `/foo/bar` and `/bar/foo` will look they are equal, right?



##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
         Ok(Arc::new(LocalFileReader::new(file)?))
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    if let Some(path) = path.to_str() {
+        Ok(path)
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Invalid path: {}", path.display()),
+        ))
+    }

Review Comment:
   You can write this more "idiomatically" like:
   
   ```suggestion
       path.to_str()
         .map_err(|e| io::Error::new(
               io::ErrorKind::InvalidInput,
               format!("Invalid path '{}': {}", path.display(), e),
           )))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#issuecomment-1121411728

   Thanks again @wjones127 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] wjones127 commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
wjones127 commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r867415428


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {

Review Comment:
   I could, but was attempting to not collect, just in case the caller didn't want to collect all results in memory. It's definitely more verbose as a result though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] wjones127 commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
wjones127 commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r867528044


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    format!("delimiter not supported on local filesystem: {}", d),
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
         Ok(Arc::new(LocalFileReader::new(file)?))
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    if let Some(path) = path.to_str() {
+        Ok(path)
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Invalid path: {}", path.display()),
+        ))
+    }

Review Comment:
   to_str() returns an Option though, not a Result. But I can use `ok_or_else()` for a similar effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#discussion_r866453099


##########
data-access/src/object_store/local.rs:
##########
@@ -50,17 +50,63 @@ impl ObjectStore for LocalFileSystem {
 
     async fn list_dir(
         &self,
-        _prefix: &str,
-        _delimiter: Option<String>,
+        prefix: &str,
+        delimiter: Option<String>,
     ) -> Result<ListEntryStream> {
-        todo!()
+        if let Some(d) = delimiter {
+            if d != "/" && d != "\\" {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "delimiter not supported on local filesystem",
+                ));
+            }
+            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
+
+            let list_entries = stream::poll_fn(move |cx| {
+                entry_stream.poll_next_entry(cx).map(|res| match res {
+                    Ok(Some(x)) => Some(Ok(x)),
+                    Ok(None) => None,
+                    Err(err) => Some(Err(err)),
+                })
+            })
+            .then(|entry| async {
+                let entry = entry?;
+                let entry = if entry.file_type().await?.is_dir() {
+                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
+                } else {
+                    ListEntry::FileMeta(get_meta(
+                        path_as_str(&entry.path())?.to_string(),
+                        entry.metadata().await?,
+                    ))
+                };
+                Ok(entry)
+            });
+
+            Ok(Box::pin(list_entries))
+        } else {
+            Ok(Box::pin(
+                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
+            ))
+        }
     }
 
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
         Ok(Arc::new(LocalFileReader::new(file)?))
     }
 }
 
+/// Try to convert a PathBuf reference into a &str
+pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
+    if let Some(child_path) = path.to_str() {
+        Ok(child_path)
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "Invalid path".to_string(),

Review Comment:
   It would be nice to include the path in the error message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2467: Add support for list_dir() on local fs

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2467:
URL: https://github.com/apache/arrow-datafusion/pull/2467#issuecomment-1120991030

   Looks like there is a lint failure (need to run `cargo fmt`) but then this PR should be good to go
   
   https://github.com/apache/arrow-datafusion/runs/6342706726?check_suite_focus=true


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org