You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/02 14:01:12 UTC

[arrow-rs] branch master updated: Handle symlinks in LocalFileSystem (#2206) (#2269)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ad65e8865 Handle symlinks in LocalFileSystem (#2206) (#2269)
ad65e8865 is described below

commit ad65e886548dce876c93eb784cd9dfb688dc96e4
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Aug 2 15:01:07 2022 +0100

    Handle symlinks in LocalFileSystem (#2206) (#2269)
    
    * Handle symlinks in LocalFileSystem (#2206)
    
    * Update object_store/src/local.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 object_store/src/local.rs    | 169 +++++++++++++++++++++++++++++++++++++++----
 object_store/src/path/mod.rs |  22 ++----
 2 files changed, 160 insertions(+), 31 deletions(-)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index e2f133e84..c3f54e0c6 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -68,56 +68,56 @@ pub(crate) enum Error {
     #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
     UnableToCreateDir {
         source: io::Error,
-        path: std::path::PathBuf,
+        path: PathBuf,
     },
 
     #[snafu(display("Unable to create file {}: {}", path.display(), err))]
     UnableToCreateFile {
-        path: std::path::PathBuf,
+        path: PathBuf,
         err: io::Error,
     },
 
     #[snafu(display("Unable to delete file {}: {}", path.display(), source))]
     UnableToDeleteFile {
         source: io::Error,
-        path: std::path::PathBuf,
+        path: PathBuf,
     },
 
     #[snafu(display("Unable to open file {}: {}", path.display(), source))]
     UnableToOpenFile {
         source: io::Error,
-        path: std::path::PathBuf,
+        path: PathBuf,
     },
 
     #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
     UnableToReadBytes {
         source: io::Error,
-        path: std::path::PathBuf,
+        path: PathBuf,
     },
 
     #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
     OutOfRange {
-        path: std::path::PathBuf,
+        path: PathBuf,
         expected: usize,
         actual: usize,
     },
 
     #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
     UnableToCopyFile {
-        from: std::path::PathBuf,
-        to: std::path::PathBuf,
+        from: PathBuf,
+        to: PathBuf,
         source: io::Error,
     },
 
     NotFound {
-        path: std::path::PathBuf,
+        path: PathBuf,
         source: io::Error,
     },
 
     #[snafu(display("Error seeking file {}: {}", path.display(), source))]
     Seek {
         source: io::Error,
-        path: std::path::PathBuf,
+        path: PathBuf,
     },
 
     #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
@@ -170,6 +170,17 @@ impl From<Error> for super::Error {
 ///
 /// If not called from a tokio context, this will perform IO on the current thread with
 /// no additional complexity or overheads
+///
+/// # Symlinks
+///
+/// [`LocalFileSystem`] will follow symlinks as normal, however, it is worth noting:
+///
+/// * Broken symlinks will be silently ignored by listing operations
+/// * No effort is made to prevent breaking symlinks when deleting files
+/// * Symlinks that resolve to paths outside the root **will** be followed
+/// * Mutating a file through one or more symlinks will mutate the underlying file
+/// * Deleting a path that resolves to a symlink will only delete the symlink
+///
 #[derive(Debug)]
 pub struct LocalFileSystem {
     config: Arc<Config>,
@@ -214,10 +225,13 @@ impl LocalFileSystem {
 
 impl Config {
     /// Return filesystem path of the given location
-    fn path_to_filesystem(&self, location: &Path) -> Result<std::path::PathBuf> {
+    fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
         let mut url = self.root.clone();
         url.path_segments_mut()
             .expect("url path")
+            // technically not necessary as Path ignores empty segments
+            // but avoids creating paths with "//" which look odd in error messages.
+            .pop_if_empty()
             .extend(location.parts());
 
         url.to_file_path()
@@ -371,7 +385,8 @@ impl ObjectStore for LocalFileSystem {
 
         let walkdir = WalkDir::new(&root_path)
             // Don't include the root directory itself
-            .min_depth(1);
+            .min_depth(1)
+            .follow_links(true);
 
         let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
             match convert_walkdir_result(result_dir_entry) {
@@ -433,7 +448,10 @@ impl ObjectStore for LocalFileSystem {
         let resolved_prefix = config.path_to_filesystem(&prefix)?;
 
         maybe_spawn_blocking(move || {
-            let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1);
+            let walkdir = WalkDir::new(&resolved_prefix)
+                .min_depth(1)
+                .max_depth(1)
+                .follow_links(true);
 
             let mut common_prefixes = BTreeSet::new();
             let mut objects = Vec::new();
@@ -732,7 +750,7 @@ impl AsyncWrite for LocalUpload {
     }
 }
 
-fn open_file(path: &std::path::PathBuf) -> Result<File> {
+fn open_file(path: &PathBuf) -> Result<File> {
     let file = File::open(path).map_err(|e| {
         if e.kind() == std::io::ErrorKind::NotFound {
             Error::NotFound {
@@ -749,7 +767,7 @@ fn open_file(path: &std::path::PathBuf) -> Result<File> {
     Ok(file)
 }
 
-fn open_writable_file(path: &std::path::PathBuf) -> Result<File> {
+fn open_writable_file(path: &PathBuf) -> Result<File> {
     match File::create(&path) {
         Ok(f) => Ok(f),
         Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
@@ -861,7 +879,8 @@ mod tests {
         },
         Error as ObjectStoreError, ObjectStore,
     };
-    use tempfile::TempDir;
+    use futures::TryStreamExt;
+    use tempfile::{NamedTempFile, TempDir};
     use tokio::io::AsyncWriteExt;
 
     #[tokio::test]
@@ -1030,6 +1049,124 @@ mod tests {
         }
     }
 
+    async fn check_list(
+        integration: &LocalFileSystem,
+        prefix: Option<&Path>,
+        expected: &[&str],
+    ) {
+        let result: Vec<_> = integration
+            .list(prefix)
+            .await
+            .unwrap()
+            .try_collect()
+            .await
+            .unwrap();
+
+        let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
+        strings.sort_unstable();
+        assert_eq!(&strings, expected)
+    }
+
+    #[tokio::test]
+    #[cfg(target_family = "unix")]
+    async fn test_symlink() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let subdir = root.path().join("a");
+        std::fs::create_dir(&subdir).unwrap();
+        let file = subdir.join("file.parquet");
+        std::fs::write(file, "test").unwrap();
+
+        check_list(&integration, None, &["a/file.parquet"]).await;
+        integration
+            .head(&Path::from("a/file.parquet"))
+            .await
+            .unwrap();
+
+        // Follow out of tree symlink
+        let other = NamedTempFile::new().unwrap();
+        std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet"))
+            .unwrap();
+
+        // Should return test.parquet even though out of tree
+        check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
+
+        // Can fetch test.parquet
+        integration.head(&Path::from("test.parquet")).await.unwrap();
+
+        // Follow in tree symlink
+        std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
+        check_list(
+            &integration,
+            None,
+            &["a/file.parquet", "b/file.parquet", "test.parquet"],
+        )
+        .await;
+        check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
+
+        // Can fetch through symlink
+        integration
+            .head(&Path::from("b/file.parquet"))
+            .await
+            .unwrap();
+
+        // Ignore broken symlink
+        std::os::unix::fs::symlink(
+            root.path().join("foo.parquet"),
+            root.path().join("c"),
+        )
+        .unwrap();
+
+        check_list(
+            &integration,
+            None,
+            &["a/file.parquet", "b/file.parquet", "test.parquet"],
+        )
+        .await;
+
+        let mut r = integration.list_with_delimiter(None).await.unwrap();
+        r.common_prefixes.sort_unstable();
+        assert_eq!(r.common_prefixes.len(), 2);
+        assert_eq!(r.common_prefixes[0].as_ref(), "a");
+        assert_eq!(r.common_prefixes[1].as_ref(), "b");
+        assert_eq!(r.objects.len(), 1);
+        assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
+
+        let r = integration
+            .list_with_delimiter(Some(&Path::from("a")))
+            .await
+            .unwrap();
+        assert_eq!(r.common_prefixes.len(), 0);
+        assert_eq!(r.objects.len(), 1);
+        assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
+
+        // Deleting a symlink doesn't delete the source file
+        integration
+            .delete(&Path::from("test.parquet"))
+            .await
+            .unwrap();
+        assert!(other.path().exists());
+
+        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
+
+        // Deleting through a symlink deletes both files
+        integration
+            .delete(&Path::from("b/file.parquet"))
+            .await
+            .unwrap();
+
+        check_list(&integration, None, &[]).await;
+
+        // Adding a file through a symlink creates in both paths
+        integration
+            .put(&Path::from("b/file.parquet"), Bytes::from(vec![0, 1, 2]))
+            .await
+            .unwrap();
+
+        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
+    }
+
     #[tokio::test]
     async fn invalid_path() {
         let root = TempDir::new().unwrap();
diff --git a/object_store/src/path/mod.rs b/object_store/src/path/mod.rs
index 23488ef66..38b7eb3e0 100644
--- a/object_store/src/path/mod.rs
+++ b/object_store/src/path/mod.rs
@@ -163,7 +163,7 @@ impl Path {
 
     /// Convert a filesystem path to a [`Path`] relative to the filesystem root
     ///
-    /// This will return an error if the path does not exist, or contains illegal
+    /// This will return an error if the path contains illegal
     /// character sequences as defined by [`Path::parse`]
     pub fn from_filesystem_path(
         path: impl AsRef<std::path::Path>,
@@ -173,9 +173,8 @@ impl Path {
 
     /// Convert a filesystem path to a [`Path`] relative to the provided base
     ///
-    /// This will return an error if the path does not exist on the local filesystem,
-    /// contains illegal character sequences as defined by [`Path::parse`], or `base`
-    /// does not refer to a parent path of `path`
+    /// This will return an error if the path contains illegal character sequences
+    /// as defined by [`Path::parse`], or `base` does not refer to a parent path of `path`
     pub(crate) fn from_filesystem_path_with_base(
         path: impl AsRef<std::path::Path>,
         base: Option<&Url>,
@@ -295,20 +294,13 @@ where
     }
 }
 
-/// Given a filesystem path, convert it to its canonical URL representation,
-/// returning an error if the file doesn't exist on the local filesystem
+/// Given a filesystem path convert it to a URL representation
 pub(crate) fn filesystem_path_to_url(
     path: impl AsRef<std::path::Path>,
 ) -> Result<Url, Error> {
-    let path = path.as_ref().canonicalize().context(CanonicalizeSnafu {
-        path: path.as_ref(),
-    })?;
-
-    match path.is_dir() {
-        true => Url::from_directory_path(&path),
-        false => Url::from_file_path(&path),
-    }
-    .map_err(|_| Error::InvalidPath { path })
+    Url::from_file_path(&path).map_err(|_| Error::InvalidPath {
+        path: path.as_ref().into(),
+    })
 }
 
 #[cfg(test)]