You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/06 17:21:29 UTC

[arrow-datafusion] branch master updated: Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b70da5479 Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)
b70da5479 is described below

commit b70da5479fab45e7d8e6c8ec1579ffc653eb160e
Author: Tim Van Wassenhove <gi...@timvw.be>
AuthorDate: Fri May 6 17:21:23 2022 +0000

    Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)
    
    * reproduce and fix issue
    
    * fix reference reported by clippy
---
 data-access/src/object_store/mod.rs      | 41 +++++++++++++++++++++++++++++---
 datafusion/core/src/execution/context.rs | 19 +++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs
index 39d1bf04d..93a930a6d 100644
--- a/data-access/src/object_store/mod.rs
+++ b/data-access/src/object_store/mod.rs
@@ -22,7 +22,7 @@ pub mod local;
 use std::fmt::Debug;
 use std::io::Read;
 use std::path;
-use std::path::{Path, PathBuf};
+use std::path::{Component, Path, PathBuf};
 use std::pin::Pin;
 use std::sync::Arc;
 
@@ -92,9 +92,13 @@ pub trait ObjectStore: Sync + Send + Debug {
         if !contains_glob_start_char(glob_pattern) {
             self.list_file(glob_pattern).await
         } else {
-            let start_path = find_longest_search_path_without_glob_pattern(glob_pattern);
+            let normalized_glob_pb = normalize_path(Path::new(glob_pattern));
+            let normalized_glob_pattern =
+                normalized_glob_pb.as_os_str().to_str().unwrap();
+            let start_path =
+                find_longest_search_path_without_glob_pattern(normalized_glob_pattern);
             let file_stream = self.list_file(&start_path).await?;
-            let pattern = Pattern::new(glob_pattern).unwrap();
+            let pattern = Pattern::new(normalized_glob_pattern).unwrap();
             Ok(Box::pin(file_stream.filter(move |fr| {
                 let matches_pattern = match fr {
                     Ok(f) => pattern.matches(f.path()),
@@ -134,6 +138,31 @@ pub trait ObjectStore: Sync + Send + Debug {
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
 }
 
+/// Normalize a path without requiring it to exist on the filesystem (path::canonicalize)
+pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
+    let ends_with_slash = path
+        .as_ref()
+        .to_str()
+        .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
+    let mut normalized = PathBuf::new();
+    for component in path.as_ref().components() {
+        match &component {
+            Component::ParentDir => {
+                if !normalized.pop() {
+                    normalized.push(component);
+                }
+            }
+            _ => {
+                normalized.push(component);
+            }
+        }
+    }
+    if ends_with_slash {
+        normalized.push("");
+    }
+    normalized
+}
+
 const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
 
 /// Determine whether the path contains a globbing character
@@ -240,6 +269,12 @@ mod tests {
             "/a/b/**/c*.txt",
             &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
         );
+        test_longest_base_path(
+            &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), // https://github.com/apache/arrow-datafusion/issues/2465
+            &format!(
+                "{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}"
+            ),
+        );
         Ok(())
     }
 }
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index d7aa8397d..3a680e529 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2159,6 +2159,25 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn read_with_glob_path_issue_2465() -> Result<()> {
+        let ctx = SessionContext::new();
+
+        let df = ctx
+            .read_parquet(
+                // it was reported that when a path contains // (two consecutive separator) no files were found
+                // in this test, regardless of parquet_test_data() value, our path now contains a //
+                format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()),
+                ParquetReadOptions::default(),
+            )
+            .await?;
+        let results = df.collect().await?;
+        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
+        // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
+        assert_eq!(total_rows, 10);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_from_registered_table_with_glob_path() -> Result<()> {
         let ctx = SessionContext::new();