You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/06 17:21:29 UTC
[arrow-datafusion] branch master updated: Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b70da5479 Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)
b70da5479 is described below
commit b70da5479fab45e7d8e6c8ec1579ffc653eb160e
Author: Tim Van Wassenhove <gi...@timvw.be>
AuthorDate: Fri May 6 17:21:23 2022 +0000
Fix `read_from_registered_table_with_glob_path` fails if path contains // #2465 (#2468)
* reproduce and fix issue
* fix reference reported by clippy
---
data-access/src/object_store/mod.rs | 41 +++++++++++++++++++++++++++++---
datafusion/core/src/execution/context.rs | 19 +++++++++++++++
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs
index 39d1bf04d..93a930a6d 100644
--- a/data-access/src/object_store/mod.rs
+++ b/data-access/src/object_store/mod.rs
@@ -22,7 +22,7 @@ pub mod local;
use std::fmt::Debug;
use std::io::Read;
use std::path;
-use std::path::{Path, PathBuf};
+use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
@@ -92,9 +92,13 @@ pub trait ObjectStore: Sync + Send + Debug {
if !contains_glob_start_char(glob_pattern) {
self.list_file(glob_pattern).await
} else {
- let start_path = find_longest_search_path_without_glob_pattern(glob_pattern);
+ let normalized_glob_pb = normalize_path(Path::new(glob_pattern));
+ let normalized_glob_pattern =
+ normalized_glob_pb.as_os_str().to_str().unwrap();
+ let start_path =
+ find_longest_search_path_without_glob_pattern(normalized_glob_pattern);
let file_stream = self.list_file(&start_path).await?;
- let pattern = Pattern::new(glob_pattern).unwrap();
+ let pattern = Pattern::new(normalized_glob_pattern).unwrap();
Ok(Box::pin(file_stream.filter(move |fr| {
let matches_pattern = match fr {
Ok(f) => pattern.matches(f.path()),
@@ -134,6 +138,31 @@ pub trait ObjectStore: Sync + Send + Debug {
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
}
+/// Normalize a path without requiring it to exist on the filesystem (path::canonicalize)
+pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
+ let ends_with_slash = path
+ .as_ref()
+ .to_str()
+ .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
+ let mut normalized = PathBuf::new();
+ for component in path.as_ref().components() {
+ match &component {
+ Component::ParentDir => {
+ if !normalized.pop() {
+ normalized.push(component);
+ }
+ }
+ _ => {
+ normalized.push(component);
+ }
+ }
+ }
+ if ends_with_slash {
+ normalized.push("");
+ }
+ normalized
+}
+
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
/// Determine whether the path contains a globbing character
@@ -240,6 +269,12 @@ mod tests {
"/a/b/**/c*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
);
+ test_longest_base_path(
+ &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), // https://github.com/apache/arrow-datafusion/issues/2465
+ &format!(
+ "{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}"
+ ),
+ );
Ok(())
}
}
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index d7aa8397d..3a680e529 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2159,6 +2159,25 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn read_with_glob_path_issue_2465() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let df = ctx
+ .read_parquet(
+ // it was reported that when a path contains // (two consecutive separator) no files were found
+ // in this test, regardless of parquet_test_data() value, our path now contains a //
+ format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ let results = df.collect().await?;
+ let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
+ // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
+ assert_eq!(total_rows, 10);
+ Ok(())
+ }
+
#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();