You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/17 13:05:14 UTC
[arrow-datafusion] branch main updated: Faster ListingTable partition listing (#6182) (#6183)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9f808f4c81 Faster ListingTable partition listing (#6182) (#6183)
9f808f4c81 is described below
commit 9f808f4c81563a4813f7612430ac267351b9f005
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 17 14:05:08 2023 +0100
Faster ListingTable partition listing (#6182) (#6183)
* Faster ListingTable partition listing (#6182)
* Fix strip_prefix
* Fix strip_prefix
* Implement list_with_delimiter for MirroringObjectStore
* Use split_terminator
* Fix MirroringObjectStore::list_with_delimiter
* Fix logical conflict
* Add logs
* Limit concurrency
* Increase concurrency limit
* Review feedback
---
datafusion/core/src/datasource/listing/helpers.rs | 514 ++++++++++------------
datafusion/core/src/datasource/listing/url.rs | 46 +-
datafusion/core/tests/path_partition.rs | 83 +++-
3 files changed, 320 insertions(+), 323 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 6032c49885..7a0dd25351 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -19,37 +19,32 @@
use std::sync::Arc;
-use arrow::array::new_empty_array;
+use arrow::compute::{and, cast, prep_null_mask_filter};
use arrow::{
- array::{ArrayBuilder, ArrayRef, Date64Builder, StringBuilder, UInt64Builder},
+ array::{ArrayRef, StringBuilder},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
-use chrono::{TimeZone, Utc};
-use futures::{stream::BoxStream, TryStreamExt};
-use log::debug;
+use arrow_array::cast::AsArray;
+use arrow_array::Array;
+use arrow_schema::Fields;
+use futures::stream::FuturesUnordered;
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use log::{debug, trace};
-use crate::{
- datasource::MemTable, error::Result, execution::context::SessionContext,
- scalar::ScalarValue,
-};
+use crate::{error::Result, scalar::ScalarValue};
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
-use datafusion_common::{
- cast::{as_date64_array, as_string_array, as_uint64_array},
- Column, DataFusionError,
-};
+use datafusion_common::{Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::expr::ScalarUDF;
use datafusion_expr::{Expr, Volatility};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_expr::execution_props::ExecutionProps;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
-const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
-const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
-const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";
-
/// Check whether the given expression can be resolved using only the columns `col_names`.
/// This means that if this function returns true:
/// - the table provider can filter the table partition values with this expression
@@ -137,6 +132,9 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
is_applicable
}
+/// The maximum number of concurrent listing requests
+const CONCURRENCY_LIMIT: usize = 100;
+
/// Partition the list of files into `n` groups
pub fn split_files(
partitioned_files: Vec<PartitionedFile>,
@@ -153,225 +151,248 @@ pub fn split_files(
.collect()
}
+struct Partition {
+ /// The path to the partition, including the table prefix
+ path: Path,
+ /// How many path segments below the table prefix `path` contains
+ /// or equivalently the number of partition values in `path`
+ depth: usize,
+ /// The files contained as direct children of this `Partition` if known
+ files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+ /// List the direct children of this partition updating `self.files` with
+ /// any child files, and returning a list of child "directories"
+ async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+ trace!("Listing partition {}", self.path);
+ let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+ let result = store.list_with_delimiter(prefix).await?;
+ self.files = Some(result.objects);
+ Ok((self, result.common_prefixes))
+ }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+ store: &dyn ObjectStore,
+ table_path: &ListingTableUrl,
+ max_depth: usize,
+) -> Result<Vec<Partition>> {
+ let partition = Partition {
+ path: table_path.prefix().clone(),
+ depth: 0,
+ files: None,
+ };
+
+ let mut out = Vec::with_capacity(64);
+
+ let mut pending = vec![];
+ let mut futures = FuturesUnordered::new();
+ futures.push(partition.list(store));
+
+ while let Some((partition, paths)) = futures.next().await.transpose()? {
+ // If pending contains a future it implies prior to this iteration
+ // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
+ // future from `pending` to the working set
+ if let Some(next) = pending.pop() {
+ futures.push(next)
+ }
+
+ let depth = partition.depth;
+ out.push(partition);
+ for path in paths {
+ let child = Partition {
+ path,
+ depth: depth + 1,
+ files: None,
+ };
+ match depth < max_depth {
+ true => match futures.len() < CONCURRENCY_LIMIT {
+ true => futures.push(child.list(store)),
+ false => pending.push(child.list(store)),
+ },
+ false => out.push(child),
+ }
+ }
+ }
+ Ok(out)
+}
+
+async fn prune_partitions(
+ table_path: &ListingTableUrl,
+ partitions: Vec<Partition>,
+ filters: &[Expr],
+ partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+ if filters.is_empty() {
+ return Ok(partitions);
+ }
+
+ let mut builders: Vec<_> = (0..partition_cols.len())
+ .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
+ .collect();
+
+ for partition in &partitions {
+ let cols = partition_cols.iter().map(|x| x.0.as_str());
+ let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
+ .unwrap_or_default();
+
+ let mut builders = builders.iter_mut();
+ for (p, b) in parsed.iter().zip(&mut builders) {
+ b.append_value(p);
+ }
+ builders.for_each(|b| b.append_null());
+ }
+
+ let arrays = partition_cols
+ .iter()
+ .zip(builders)
+ .map(|((_, d), mut builder)| {
+ let array = builder.finish();
+ cast(&array, d)
+ })
+ .collect::<Result<_, _>>()?;
+
+ let fields: Fields = partition_cols
+ .iter()
+ .map(|(n, d)| Field::new(n, d.clone(), true))
+ .collect();
+ let schema = Arc::new(Schema::new(fields));
+
+ let df_schema = DFSchema::new_with_metadata(
+ partition_cols
+ .iter()
+ .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+ .collect(),
+ Default::default(),
+ )?;
+
+ let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+ // TODO: Plumb this down
+ let props = ExecutionProps::new();
+
+ // Applies `filter` to `batch` returning `None` on error
+ let do_filter = |filter| -> Option<ArrayRef> {
+ let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
+ Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+ };
+
+ //.Compute the conjunction of the filters, ignoring errors
+ let mask = filters
+ .iter()
+ .fold(None, |acc, filter| match (acc, do_filter(filter)) {
+ (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
+ (None, Some(r)) => Some(r.as_boolean().clone()),
+ (r, None) => r,
+ });
+
+ let mask = match mask {
+ Some(mask) => mask,
+ None => return Ok(partitions),
+ };
+
+ // Don't retain partitions that evaluated to null
+ let prepared = match mask.null_count() {
+ 0 => mask,
+ _ => prep_null_mask_filter(&mask),
+ };
+
+ // Sanity check
+ assert_eq!(prepared.len(), partitions.len());
+
+ let filtered = partitions
+ .into_iter()
+ .zip(prepared.values())
+ .filter_map(|(p, f)| f.then_some(p))
+ .collect();
+
+ Ok(filtered)
+}
+
/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
-///
-/// TODO for tables with many files (10k+), it will usually more efficient
-/// to first list the folders relative to the first partition dimension,
-/// prune those, then list only the contain of the remaining folders.
pub async fn pruned_partition_list<'a>(
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
file_extension: &'a str,
- table_partition_cols: &'a [(String, DataType)],
+ partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
let list = table_path.list_all_files(store, file_extension);
// if no partition col => simply list all the files
- if table_partition_cols.is_empty() {
+ if partition_cols.is_empty() {
return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
}
- let applicable_filters: Vec<_> = filters
- .iter()
- .filter(|f| {
- expr_applicable_for_cols(
- &table_partition_cols
- .iter()
- .map(|x| x.0.clone())
- .collect::<Vec<_>>(),
- f,
- )
- })
- .collect();
+ let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
+ debug!("Listed {} partitions", partitions.len());
- if applicable_filters.is_empty() {
- // Parse the partition values while listing all the files
- // Note: We might avoid parsing the partition values if they are not used in any projection,
- // but the cost of parsing will likely be far dominated by the time to fetch the listing from
- // the object store.
- Ok(Box::pin(list.try_filter_map(
- move |object_meta| async move {
- let parsed_path = parse_partitions_for_path(
- table_path,
- &object_meta.location,
- &table_partition_cols
- .iter()
- .map(|x| x.0.clone())
- .collect::<Vec<_>>(),
- )
- .map(|p| {
- p.iter()
- .zip(table_partition_cols)
- .map(|(&part_value, part_column)| {
- ScalarValue::try_from_string(
- part_value.to_string(),
- &part_column.1,
- )
- .unwrap_or_else(|_| {
- panic!(
- "Failed to cast str {} to type {}",
- part_value, part_column.1
- )
- })
- })
- .collect()
- });
-
- Ok(parsed_path.map(|partition_values| PartitionedFile {
- partition_values,
- object_meta,
- range: None,
- extensions: None,
- }))
- },
- )))
- } else {
- // parse the partition values and serde them as a RecordBatch to filter them
- let metas: Vec<_> = list.try_collect().await?;
- let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
- let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
- debug!("get mem_table: {:?}", mem_table);
-
- // Filter the partitions using a local datafusion context
- // TODO having the external context would allow us to resolve `Volatility::Stable`
- // scalar functions (`ScalarFunction` & `ScalarUDF`) and `ScalarVariable`s
- let ctx = SessionContext::new();
- let mut df = ctx.read_table(Arc::new(mem_table))?;
- for filter in applicable_filters {
- df = df.filter(filter.clone())?;
- }
- let filtered_batches = df.collect().await?;
- let paths = batches_to_paths(&filtered_batches)?;
+ let pruned =
+ prune_partitions(table_path, partitions, filters, partition_cols).await?;
- Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok))))
- }
-}
+ debug!("Pruning yielded {} partitions", pruned.len());
-/// convert the paths of the files to a record batch with the following columns:
-/// - one column for the file size named `_df_part_file_size_`
-/// - one column for with the original path named `_df_part_file_path_`
-/// - one column for with the last modified date named `_df_part_file_modified_`
-/// - ... one column by partition ...
-///
-/// Note: For the last modified date, this looses precisions higher than millisecond.
-fn paths_to_batch(
- table_partition_cols: &[(String, DataType)],
- table_path: &ListingTableUrl,
- metas: &[ObjectMeta],
-) -> Result<RecordBatch> {
- let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
- let mut length_builder = UInt64Builder::with_capacity(metas.len());
- let mut modified_builder = Date64Builder::with_capacity(metas.len());
- let mut partition_scalar_values = table_partition_cols
- .iter()
- .map(|_| Vec::new())
- .collect::<Vec<_>>();
- for file_meta in metas {
- if let Some(partition_values) = parse_partitions_for_path(
- table_path,
- &file_meta.location,
- &table_partition_cols
- .iter()
- .map(|x| x.0.clone())
- .collect::<Vec<_>>(),
- ) {
- key_builder.append_value(file_meta.location.as_ref());
- length_builder.append_value(file_meta.size as u64);
- modified_builder.append_value(file_meta.last_modified.timestamp_millis());
- for (i, part_val) in partition_values.iter().enumerate() {
- let scalar_val = ScalarValue::try_from_string(
- part_val.to_string(),
- &table_partition_cols[i].1,
- )?;
- partition_scalar_values[i].push(scalar_val);
- }
- } else {
- debug!("No partitioning for path {}", file_meta.location);
- }
- }
+ let stream = futures::stream::iter(pruned)
+ .map(move |partition: Partition| async move {
+ let cols = partition_cols.iter().map(|x| x.0.as_str());
+ let parsed = parse_partitions_for_path(table_path, &partition.path, cols);
- // finish all builders
- let mut col_arrays: Vec<ArrayRef> = vec![
- ArrayBuilder::finish(&mut key_builder),
- ArrayBuilder::finish(&mut length_builder),
- ArrayBuilder::finish(&mut modified_builder),
- ];
- for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
- if part_scalar_val.is_empty() {
- col_arrays.push(new_empty_array(&table_partition_cols[i].1));
- } else {
- let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
- col_arrays.push(partition_val_array);
- }
- }
-
- // put the schema together
- let mut fields = vec![
- Field::new(FILE_PATH_COLUMN_NAME, DataType::Utf8, false),
- Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
- Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
- ];
- for part_col in table_partition_cols {
- fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
- }
+ let partition_values = parsed
+ .into_iter()
+ .flatten()
+ .zip(partition_cols)
+ .map(|(parsed, (_, datatype))| {
+ ScalarValue::try_from_string(parsed.to_string(), datatype)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let files = match partition.files {
+ Some(files) => files,
+ None => {
+ trace!("Recursively listing partition {}", partition.path);
+ let s = store.list(Some(&partition.path)).await?;
+ s.try_collect().await?
+ }
+ };
- let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
- Ok(batch)
-}
+ let files = files.into_iter().filter(move |o| {
+ let extension_match = o.location.as_ref().ends_with(file_extension);
+ let glob_match = table_path.contains(&o.location);
+ extension_match && glob_match
+ });
-/// convert a set of record batches created by `paths_to_batch()` back to partitioned files.
-fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
- batches
- .iter()
- .flat_map(|batch| {
- let key_array = as_string_array(batch.column(0)).unwrap();
- let length_array = as_uint64_array(batch.column(1)).unwrap();
- let modified_array = as_date64_array(batch.column(2)).unwrap();
-
- (0..batch.num_rows()).map(move |row| {
+ let stream = futures::stream::iter(files.map(move |object_meta| {
Ok(PartitionedFile {
- object_meta: ObjectMeta {
- location: Path::parse(key_array.value(row))
- .map_err(|e| DataFusionError::External(Box::new(e)))?,
- last_modified: to_timestamp_millis(modified_array.value(row))?,
- size: length_array.value(row) as usize,
- },
- partition_values: (3..batch.columns().len())
- .map(|col| {
- ScalarValue::try_from_array(batch.column(col), row).unwrap()
- })
- .collect(),
+ object_meta,
+ partition_values: partition_values.clone(),
range: None,
extensions: None,
})
- })
- })
- .collect()
-}
+ }));
-fn to_timestamp_millis(v: i64) -> Result<chrono::DateTime<Utc>> {
- match Utc.timestamp_millis_opt(v) {
- chrono::LocalResult::None => Err(DataFusionError::Execution(format!(
- "Can not convert {v} to UTC millisecond timestamp"
- ))),
- chrono::LocalResult::Single(v) => Ok(v),
- chrono::LocalResult::Ambiguous(_, _) => Err(DataFusionError::Execution(format!(
- "Ambiguous timestamp when converting {v} to UTC millisecond timestamp"
- ))),
- }
+ Ok::<_, DataFusionError>(stream)
+ })
+ .buffer_unordered(CONCURRENCY_LIMIT)
+ .try_flatten()
+ .boxed();
+ Ok(stream)
}
/// Extract the partition values for the given `file_path` (in the given `table_path`)
/// associated to the partitions defined by `table_partition_cols`
-fn parse_partitions_for_path<'a>(
+fn parse_partitions_for_path<'a, I>(
table_path: &ListingTableUrl,
file_path: &'a Path,
- table_partition_cols: &[String],
-) -> Option<Vec<&'a str>> {
+ table_partition_cols: I,
+) -> Option<Vec<&'a str>>
+where
+ I: IntoIterator<Item = &'a str>,
+{
let subpath = table_path.strip_prefix(file_path)?;
let mut part_values = vec![];
@@ -569,7 +590,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/file.csv"),
- &[]
+ vec![]
)
);
assert_eq!(
@@ -577,7 +598,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
&Path::from("bucket/mytable/file.csv"),
- &[]
+ vec![]
)
);
assert_eq!(
@@ -585,7 +606,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/file.csv"),
- &[String::from("mypartition")]
+ vec!["mypartition"]
)
);
assert_eq!(
@@ -593,7 +614,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/mypartition=v1/file.csv"),
- &[String::from("mypartition")]
+ vec!["mypartition"]
)
);
assert_eq!(
@@ -601,7 +622,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
&Path::from("bucket/mytable/mypartition=v1/file.csv"),
- &[String::from("mypartition")]
+ vec!["mypartition"]
)
);
// Only hive style partitioning supported for now:
@@ -610,7 +631,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/v1/file.csv"),
- &[String::from("mypartition")]
+ vec!["mypartition"]
)
);
assert_eq!(
@@ -618,7 +639,7 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
- &[String::from("mypartition"), String::from("otherpartition")]
+ vec!["mypartition", "otherpartition"]
)
);
assert_eq!(
@@ -626,82 +647,11 @@ mod tests {
parse_partitions_for_path(
&ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
&Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
- &[String::from("mypartition")]
+ vec!["mypartition"]
)
);
}
- #[test]
- fn test_path_batch_roundtrip_no_partiton() {
- let files = vec![
- ObjectMeta {
- location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
- last_modified: to_timestamp_millis(1634722979123).unwrap(),
- size: 100,
- },
- ObjectMeta {
- location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
- last_modified: to_timestamp_millis(0).unwrap(),
- size: 100,
- },
- ];
-
- let table_path = ListingTableUrl::parse("file:///mybucket/tablepath").unwrap();
- let batches = paths_to_batch(&[], &table_path, &files)
- .expect("Serialization of file list to batch failed");
-
- let parsed_files = batches_to_paths(&[batches]).unwrap();
- assert_eq!(parsed_files.len(), 2);
- assert_eq!(&parsed_files[0].partition_values, &[]);
- assert_eq!(&parsed_files[1].partition_values, &[]);
-
- let parsed_metas = parsed_files
- .into_iter()
- .map(|pf| pf.object_meta)
- .collect::<Vec<_>>();
- assert_eq!(parsed_metas, files);
- }
-
- #[test]
- fn test_path_batch_roundtrip_with_partition() {
- let files = vec![
- ObjectMeta {
- location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
- last_modified: to_timestamp_millis(1634722979123).unwrap(),
- size: 100,
- },
- ObjectMeta {
- location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
- last_modified: to_timestamp_millis(0).unwrap(),
- size: 100,
- },
- ];
-
- let batches = paths_to_batch(
- &[(String::from("part1"), DataType::Utf8)],
- &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
- &files,
- )
- .expect("Serialization of file list to batch failed");
-
- let parsed_files = batches_to_paths(&[batches]).unwrap();
- assert_eq!(parsed_files.len(), 2);
- assert_eq!(
- &parsed_files[0].partition_values,
- &[ScalarValue::Utf8(Some(String::from("val1")))]
- );
- assert_eq!(
- &parsed_files[1].partition_values,
- &[ScalarValue::Utf8(Some(String::from("val2")))]
- );
-
- let parsed_metas = parsed_files
- .into_iter()
- .map(|pf| pf.object_meta)
- .collect::<Vec<_>>();
- assert_eq!(parsed_metas, files);
- }
-
#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs
index 798359208f..dc96f959e4 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -124,6 +124,25 @@ impl ListingTableUrl {
self.url.scheme()
}
+ /// Return the prefix from which to list files
+ pub fn prefix(&self) -> &Path {
+ &self.prefix
+ }
+
+ /// Returns `true` if `path` matches this [`ListingTableUrl`]
+ pub fn contains(&self, path: &Path) -> bool {
+ match self.strip_prefix(path) {
+ Some(mut segments) => match &self.glob {
+ Some(glob) => {
+ let stripped = segments.join("/");
+ glob.matches(&stripped)
+ }
+ None => true,
+ },
+ None => false,
+ }
+ }
+
/// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
/// an iterator of the remaining path segments
pub(crate) fn strip_prefix<'a, 'b: 'a>(
@@ -131,12 +150,11 @@ impl ListingTableUrl {
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
use object_store::path::DELIMITER;
- let path: &str = path.as_ref();
- let stripped = match self.prefix.as_ref() {
- "" => path,
- p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?,
- };
- Some(stripped.split(DELIMITER))
+ let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
+ if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
+ stripped = stripped.strip_prefix(DELIMITER)?;
+ }
+ Some(stripped.split_terminator(DELIMITER))
}
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
@@ -158,17 +176,7 @@ impl ListingTableUrl {
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
- let glob_match = match &self.glob {
- Some(glob) => match self.strip_prefix(path) {
- Some(mut segments) => {
- let stripped = segments.join("/");
- glob.matches(&stripped)
- }
- None => false,
- },
- None => true,
- };
-
+ let glob_match = self.contains(path);
futures::future::ready(extension_match && glob_match)
})
.boxed()
@@ -254,6 +262,10 @@ mod tests {
let child = Path::parse("/foob/bar").unwrap();
assert!(url.strip_prefix(&child).is_none());
+ let url = ListingTableUrl::parse("file:///foo/file").unwrap();
+ let child = Path::parse("/foo/file").unwrap();
+ assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
+
let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/ bar");
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index b93ad02aa2..afaac5a7bd 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -18,6 +18,7 @@
//! Test queries on partitioned datasets
use arrow::datatypes::DataType;
+use std::collections::BTreeSet;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
@@ -39,8 +40,8 @@ use datafusion::{
test_util::{self, arrow_test_data, parquet_test_data},
};
use datafusion_common::ScalarValue;
+use futures::stream;
use futures::stream::BoxStream;
-use futures::{stream, StreamExt};
use object_store::{
path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
};
@@ -594,7 +595,7 @@ async fn register_partitioned_alltypes_parquet(
/// An object store implem that is mirrors a given file to multiple paths.
pub struct MirroringObjectStore {
/// The `(path,size)` of the files that "exist" in the store
- files: Vec<String>,
+ files: Vec<Path>,
/// The file that will be read at all path
mirrored_file: String,
/// Size of the mirrored file
@@ -611,7 +612,7 @@ impl MirroringObjectStore {
pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
let metadata = std::fs::metadata(&mirrored_file).expect("Local file metadata");
Arc::new(Self {
- files: paths.iter().map(|&f| f.to_owned()).collect(),
+ files: paths.iter().map(|f| Path::parse(f).unwrap()).collect(),
mirrored_file,
file_size: metadata.len(),
})
@@ -640,7 +641,7 @@ impl ObjectStore for MirroringObjectStore {
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
- self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ self.files.iter().find(|x| *x == location).unwrap();
let path = std::path::PathBuf::from(&self.mirrored_file);
let file = File::open(&path).unwrap();
Ok(GetResult::File(file, path))
@@ -651,7 +652,7 @@ impl ObjectStore for MirroringObjectStore {
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
- self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ self.files.iter().find(|x| *x == location).unwrap();
let path = std::path::PathBuf::from(&self.mirrored_file);
let mut file = File::open(path).unwrap();
file.seek(SeekFrom::Start(range.start as u64)).unwrap();
@@ -665,7 +666,7 @@ impl ObjectStore for MirroringObjectStore {
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
- self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+ self.files.iter().find(|x| *x == location).unwrap();
Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc.timestamp_nanos(0),
@@ -681,30 +682,64 @@ impl ObjectStore for MirroringObjectStore {
&self,
prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
- let prefix = prefix.map(|p| p.as_ref()).unwrap_or("").to_string();
- let size = self.file_size as usize;
- Ok(Box::pin(
- stream::iter(
- self.files
- .clone()
- .into_iter()
- .filter(move |f| f.starts_with(&prefix)),
- )
- .map(move |f| {
- Ok(ObjectMeta {
- location: Path::parse(f)?,
- last_modified: Utc.timestamp_nanos(0),
- size,
+ let prefix = prefix.cloned().unwrap_or_default();
+ Ok(Box::pin(stream::iter(self.files.iter().filter_map(
+ move |location| {
+ // Don't return for exact prefix match
+ let filter = location
+ .prefix_match(&prefix)
+ .map(|mut x| x.next().is_some())
+ .unwrap_or(false);
+
+ filter.then(|| {
+ Ok(ObjectMeta {
+ location: location.clone(),
+ last_modified: Utc.timestamp_nanos(0),
+ size: self.file_size as usize,
+ })
})
- }),
- ))
+ },
+ ))))
}
async fn list_with_delimiter(
&self,
- _prefix: Option<&Path>,
+ prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
- unimplemented!()
+ let root = Path::default();
+ let prefix = prefix.unwrap_or(&root);
+
+ let mut common_prefixes = BTreeSet::new();
+ let mut objects = vec![];
+
+ for k in &self.files {
+ let mut parts = match k.prefix_match(prefix) {
+ Some(parts) => parts,
+ None => continue,
+ };
+
+ // Pop first element
+ let common_prefix = match parts.next() {
+ Some(p) => p,
+ // Should only return children of the prefix
+ None => continue,
+ };
+
+ if parts.next().is_some() {
+ common_prefixes.insert(prefix.child(common_prefix));
+ } else {
+ let object = ObjectMeta {
+ location: k.clone(),
+ last_modified: Utc.timestamp_nanos(0),
+ size: self.file_size as usize,
+ };
+ objects.push(object);
+ }
+ }
+ Ok(ListResult {
+ common_prefixes: common_prefixes.into_iter().collect(),
+ objects,
+ })
}
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {