You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/17 13:05:14 UTC

[arrow-datafusion] branch main updated: Faster ListingTable partition listing (#6182) (#6183)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f808f4c81 Faster ListingTable partition listing (#6182) (#6183)
9f808f4c81 is described below

commit 9f808f4c81563a4813f7612430ac267351b9f005
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 17 14:05:08 2023 +0100

    Faster ListingTable partition listing (#6182) (#6183)
    
    * Faster ListingTable partition listing (#6182)
    
    * Fix strip_prefix
    
    * Fix strip_prefix
    
    * Implement list_with_delimiter for MirroringObjectStore
    
    * Use split_terminator
    
    * Fix MirroringObjectStore::list_with_delimiter
    
    * Fix logical conflict
    
    * Add logs
    
    * Limit concurrency
    
    * Increase concurrency limit
    
    * Review feedback
---
 datafusion/core/src/datasource/listing/helpers.rs | 514 ++++++++++------------
 datafusion/core/src/datasource/listing/url.rs     |  46 +-
 datafusion/core/tests/path_partition.rs           |  83 +++-
 3 files changed, 320 insertions(+), 323 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 6032c49885..7a0dd25351 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -19,37 +19,32 @@
 
 use std::sync::Arc;
 
-use arrow::array::new_empty_array;
+use arrow::compute::{and, cast, prep_null_mask_filter};
 use arrow::{
-    array::{ArrayBuilder, ArrayRef, Date64Builder, StringBuilder, UInt64Builder},
+    array::{ArrayRef, StringBuilder},
     datatypes::{DataType, Field, Schema},
     record_batch::RecordBatch,
 };
-use chrono::{TimeZone, Utc};
-use futures::{stream::BoxStream, TryStreamExt};
-use log::debug;
+use arrow_array::cast::AsArray;
+use arrow_array::Array;
+use arrow_schema::Fields;
+use futures::stream::FuturesUnordered;
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use log::{debug, trace};
 
-use crate::{
-    datasource::MemTable, error::Result, execution::context::SessionContext,
-    scalar::ScalarValue,
-};
+use crate::{error::Result, scalar::ScalarValue};
 
 use super::PartitionedFile;
 use crate::datasource::listing::ListingTableUrl;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
-use datafusion_common::{
-    cast::{as_date64_array, as_string_array, as_uint64_array},
-    Column, DataFusionError,
-};
+use datafusion_common::{Column, DFField, DFSchema, DataFusionError};
 use datafusion_expr::expr::ScalarUDF;
 use datafusion_expr::{Expr, Volatility};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_expr::execution_props::ExecutionProps;
 use object_store::path::Path;
 use object_store::{ObjectMeta, ObjectStore};
 
-const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
-const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
-const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";
-
 /// Check whether the given expression can be resolved using only the columns `col_names`.
 /// This means that if this function returns true:
 /// - the table provider can filter the table partition values with this expression
@@ -137,6 +132,9 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
     is_applicable
 }
 
+/// The maximum number of concurrent listing requests
+const CONCURRENCY_LIMIT: usize = 100;
+
 /// Partition the list of files into `n` groups
 pub fn split_files(
     partitioned_files: Vec<PartitionedFile>,
@@ -153,225 +151,248 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {
+    /// The path to the partition, including the table prefix
+    path: Path,
+    /// How many path segments below the table prefix `path` contains
+    /// or equivalently the number of partition values in `path`
+    depth: usize,
+    /// The files contained as direct children of this `Partition` if known
+    files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+    /// List the direct children of this partition updating `self.files` with
+    /// any child files, and returning a list of child "directories"
+    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+        trace!("Listing partition {}", self.path);
+        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+        let result = store.list_with_delimiter(prefix).await?;
+        self.files = Some(result.objects);
+        Ok((self, result.common_prefixes))
+    }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+    store: &dyn ObjectStore,
+    table_path: &ListingTableUrl,
+    max_depth: usize,
+) -> Result<Vec<Partition>> {
+    let partition = Partition {
+        path: table_path.prefix().clone(),
+        depth: 0,
+        files: None,
+    };
+
+    let mut out = Vec::with_capacity(64);
+
+    let mut pending = vec![];
+    let mut futures = FuturesUnordered::new();
+    futures.push(partition.list(store));
+
+    while let Some((partition, paths)) = futures.next().await.transpose()? {
+        // If pending contains a future it implies prior to this iteration
+        // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
+        // future from `pending` to the working set
+        if let Some(next) = pending.pop() {
+            futures.push(next)
+        }
+
+        let depth = partition.depth;
+        out.push(partition);
+        for path in paths {
+            let child = Partition {
+                path,
+                depth: depth + 1,
+                files: None,
+            };
+            match depth < max_depth {
+                true => match futures.len() < CONCURRENCY_LIMIT {
+                    true => futures.push(child.list(store)),
+                    false => pending.push(child.list(store)),
+                },
+                false => out.push(child),
+            }
+        }
+    }
+    Ok(out)
+}
+
+async fn prune_partitions(
+    table_path: &ListingTableUrl,
+    partitions: Vec<Partition>,
+    filters: &[Expr],
+    partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+    if filters.is_empty() {
+        return Ok(partitions);
+    }
+
+    let mut builders: Vec<_> = (0..partition_cols.len())
+        .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
+        .collect();
+
+    for partition in &partitions {
+        let cols = partition_cols.iter().map(|x| x.0.as_str());
+        let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
+            .unwrap_or_default();
+
+        let mut builders = builders.iter_mut();
+        for (p, b) in parsed.iter().zip(&mut builders) {
+            b.append_value(p);
+        }
+        builders.for_each(|b| b.append_null());
+    }
+
+    let arrays = partition_cols
+        .iter()
+        .zip(builders)
+        .map(|((_, d), mut builder)| {
+            let array = builder.finish();
+            cast(&array, d)
+        })
+        .collect::<Result<_, _>>()?;
+
+    let fields: Fields = partition_cols
+        .iter()
+        .map(|(n, d)| Field::new(n, d.clone(), true))
+        .collect();
+    let schema = Arc::new(Schema::new(fields));
+
+    let df_schema = DFSchema::new_with_metadata(
+        partition_cols
+            .iter()
+            .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+            .collect(),
+        Default::default(),
+    )?;
+
+    let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+    // TODO: Plumb this down
+    let props = ExecutionProps::new();
+
+    // Applies `filter` to `batch` returning `None` on error
+    let do_filter = |filter| -> Option<ArrayRef> {
+        let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
+        Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+    };
+
+    //.Compute the conjunction of the filters, ignoring errors
+    let mask = filters
+        .iter()
+        .fold(None, |acc, filter| match (acc, do_filter(filter)) {
+            (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
+            (None, Some(r)) => Some(r.as_boolean().clone()),
+            (r, None) => r,
+        });
+
+    let mask = match mask {
+        Some(mask) => mask,
+        None => return Ok(partitions),
+    };
+
+    // Don't retain partitions that evaluated to null
+    let prepared = match mask.null_count() {
+        0 => mask,
+        _ => prep_null_mask_filter(&mask),
+    };
+
+    // Sanity check
+    assert_eq!(prepared.len(), partitions.len());
+
+    let filtered = partitions
+        .into_iter()
+        .zip(prepared.values())
+        .filter_map(|(p, f)| f.then_some(p))
+        .collect();
+
+    Ok(filtered)
+}
+
 /// Discover the partitions on the given path and prune out files
 /// that belong to irrelevant partitions using `filters` expressions.
 /// `filters` might contain expressions that can be resolved only at the
 /// file level (e.g. Parquet row group pruning).
-///
-/// TODO for tables with many files (10k+), it will usually more efficient
-/// to first list the folders relative to the first partition dimension,
-/// prune those, then list only the contain of the remaining folders.
 pub async fn pruned_partition_list<'a>(
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
     file_extension: &'a str,
-    table_partition_cols: &'a [(String, DataType)],
+    partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
     let list = table_path.list_all_files(store, file_extension);
 
     // if no partition col => simply list all the files
-    if table_partition_cols.is_empty() {
+    if partition_cols.is_empty() {
         return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
     }
 
-    let applicable_filters: Vec<_> = filters
-        .iter()
-        .filter(|f| {
-            expr_applicable_for_cols(
-                &table_partition_cols
-                    .iter()
-                    .map(|x| x.0.clone())
-                    .collect::<Vec<_>>(),
-                f,
-            )
-        })
-        .collect();
+    let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
+    debug!("Listed {} partitions", partitions.len());
 
-    if applicable_filters.is_empty() {
-        // Parse the partition values while listing all the files
-        // Note: We might avoid parsing the partition values if they are not used in any projection,
-        // but the cost of parsing will likely be far dominated by the time to fetch the listing from
-        // the object store.
-        Ok(Box::pin(list.try_filter_map(
-            move |object_meta| async move {
-                let parsed_path = parse_partitions_for_path(
-                    table_path,
-                    &object_meta.location,
-                    &table_partition_cols
-                        .iter()
-                        .map(|x| x.0.clone())
-                        .collect::<Vec<_>>(),
-                )
-                .map(|p| {
-                    p.iter()
-                        .zip(table_partition_cols)
-                        .map(|(&part_value, part_column)| {
-                            ScalarValue::try_from_string(
-                                part_value.to_string(),
-                                &part_column.1,
-                            )
-                            .unwrap_or_else(|_| {
-                                panic!(
-                                    "Failed to cast str {} to type {}",
-                                    part_value, part_column.1
-                                )
-                            })
-                        })
-                        .collect()
-                });
-
-                Ok(parsed_path.map(|partition_values| PartitionedFile {
-                    partition_values,
-                    object_meta,
-                    range: None,
-                    extensions: None,
-                }))
-            },
-        )))
-    } else {
-        // parse the partition values and serde them as a RecordBatch to filter them
-        let metas: Vec<_> = list.try_collect().await?;
-        let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
-        let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
-        debug!("get mem_table: {:?}", mem_table);
-
-        // Filter the partitions using a local datafusion context
-        // TODO having the external context would allow us to resolve `Volatility::Stable`
-        // scalar functions (`ScalarFunction` & `ScalarUDF`) and `ScalarVariable`s
-        let ctx = SessionContext::new();
-        let mut df = ctx.read_table(Arc::new(mem_table))?;
-        for filter in applicable_filters {
-            df = df.filter(filter.clone())?;
-        }
-        let filtered_batches = df.collect().await?;
-        let paths = batches_to_paths(&filtered_batches)?;
+    let pruned =
+        prune_partitions(table_path, partitions, filters, partition_cols).await?;
 
-        Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok))))
-    }
-}
+    debug!("Pruning yielded {} partitions", pruned.len());
 
-/// convert the paths of the files to a record batch with the following columns:
-/// - one column for the file size named `_df_part_file_size_`
-/// - one column for with the original path named `_df_part_file_path_`
-/// - one column for with the last modified date named `_df_part_file_modified_`
-/// - ... one column by partition ...
-///
-/// Note: For the last modified date, this looses precisions higher than millisecond.
-fn paths_to_batch(
-    table_partition_cols: &[(String, DataType)],
-    table_path: &ListingTableUrl,
-    metas: &[ObjectMeta],
-) -> Result<RecordBatch> {
-    let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
-    let mut length_builder = UInt64Builder::with_capacity(metas.len());
-    let mut modified_builder = Date64Builder::with_capacity(metas.len());
-    let mut partition_scalar_values = table_partition_cols
-        .iter()
-        .map(|_| Vec::new())
-        .collect::<Vec<_>>();
-    for file_meta in metas {
-        if let Some(partition_values) = parse_partitions_for_path(
-            table_path,
-            &file_meta.location,
-            &table_partition_cols
-                .iter()
-                .map(|x| x.0.clone())
-                .collect::<Vec<_>>(),
-        ) {
-            key_builder.append_value(file_meta.location.as_ref());
-            length_builder.append_value(file_meta.size as u64);
-            modified_builder.append_value(file_meta.last_modified.timestamp_millis());
-            for (i, part_val) in partition_values.iter().enumerate() {
-                let scalar_val = ScalarValue::try_from_string(
-                    part_val.to_string(),
-                    &table_partition_cols[i].1,
-                )?;
-                partition_scalar_values[i].push(scalar_val);
-            }
-        } else {
-            debug!("No partitioning for path {}", file_meta.location);
-        }
-    }
+    let stream = futures::stream::iter(pruned)
+        .map(move |partition: Partition| async move {
+            let cols = partition_cols.iter().map(|x| x.0.as_str());
+            let parsed = parse_partitions_for_path(table_path, &partition.path, cols);
 
-    // finish all builders
-    let mut col_arrays: Vec<ArrayRef> = vec![
-        ArrayBuilder::finish(&mut key_builder),
-        ArrayBuilder::finish(&mut length_builder),
-        ArrayBuilder::finish(&mut modified_builder),
-    ];
-    for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
-        if part_scalar_val.is_empty() {
-            col_arrays.push(new_empty_array(&table_partition_cols[i].1));
-        } else {
-            let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
-            col_arrays.push(partition_val_array);
-        }
-    }
-
-    // put the schema together
-    let mut fields = vec![
-        Field::new(FILE_PATH_COLUMN_NAME, DataType::Utf8, false),
-        Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
-        Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
-    ];
-    for part_col in table_partition_cols {
-        fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
-    }
+            let partition_values = parsed
+                .into_iter()
+                .flatten()
+                .zip(partition_cols)
+                .map(|(parsed, (_, datatype))| {
+                    ScalarValue::try_from_string(parsed.to_string(), datatype)
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            let files = match partition.files {
+                Some(files) => files,
+                None => {
+                    trace!("Recursively listing partition {}", partition.path);
+                    let s = store.list(Some(&partition.path)).await?;
+                    s.try_collect().await?
+                }
+            };
 
-    let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
-    Ok(batch)
-}
+            let files = files.into_iter().filter(move |o| {
+                let extension_match = o.location.as_ref().ends_with(file_extension);
+                let glob_match = table_path.contains(&o.location);
+                extension_match && glob_match
+            });
 
-/// convert a set of record batches created by `paths_to_batch()` back to partitioned files.
-fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
-    batches
-        .iter()
-        .flat_map(|batch| {
-            let key_array = as_string_array(batch.column(0)).unwrap();
-            let length_array = as_uint64_array(batch.column(1)).unwrap();
-            let modified_array = as_date64_array(batch.column(2)).unwrap();
-
-            (0..batch.num_rows()).map(move |row| {
+            let stream = futures::stream::iter(files.map(move |object_meta| {
                 Ok(PartitionedFile {
-                    object_meta: ObjectMeta {
-                        location: Path::parse(key_array.value(row))
-                            .map_err(|e| DataFusionError::External(Box::new(e)))?,
-                        last_modified: to_timestamp_millis(modified_array.value(row))?,
-                        size: length_array.value(row) as usize,
-                    },
-                    partition_values: (3..batch.columns().len())
-                        .map(|col| {
-                            ScalarValue::try_from_array(batch.column(col), row).unwrap()
-                        })
-                        .collect(),
+                    object_meta,
+                    partition_values: partition_values.clone(),
                     range: None,
                     extensions: None,
                 })
-            })
-        })
-        .collect()
-}
+            }));
 
-fn to_timestamp_millis(v: i64) -> Result<chrono::DateTime<Utc>> {
-    match Utc.timestamp_millis_opt(v) {
-        chrono::LocalResult::None => Err(DataFusionError::Execution(format!(
-            "Can not convert {v} to UTC millisecond timestamp"
-        ))),
-        chrono::LocalResult::Single(v) => Ok(v),
-        chrono::LocalResult::Ambiguous(_, _) => Err(DataFusionError::Execution(format!(
-            "Ambiguous timestamp when converting {v} to UTC millisecond timestamp"
-        ))),
-    }
+            Ok::<_, DataFusionError>(stream)
+        })
+        .buffer_unordered(CONCURRENCY_LIMIT)
+        .try_flatten()
+        .boxed();
+    Ok(stream)
 }
 
 /// Extract the partition values for the given `file_path` (in the given `table_path`)
 /// associated to the partitions defined by `table_partition_cols`
-fn parse_partitions_for_path<'a>(
+fn parse_partitions_for_path<'a, I>(
     table_path: &ListingTableUrl,
     file_path: &'a Path,
-    table_partition_cols: &[String],
-) -> Option<Vec<&'a str>> {
+    table_partition_cols: I,
+) -> Option<Vec<&'a str>>
+where
+    I: IntoIterator<Item = &'a str>,
+{
     let subpath = table_path.strip_prefix(file_path)?;
 
     let mut part_values = vec![];
@@ -569,7 +590,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/file.csv"),
-                &[]
+                vec![]
             )
         );
         assert_eq!(
@@ -577,7 +598,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
                 &Path::from("bucket/mytable/file.csv"),
-                &[]
+                vec![]
             )
         );
         assert_eq!(
@@ -585,7 +606,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
         assert_eq!(
@@ -593,7 +614,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
         assert_eq!(
@@ -601,7 +622,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
         // Only hive style partitioning supported for now:
@@ -610,7 +631,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/v1/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
         assert_eq!(
@@ -618,7 +639,7 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
-                &[String::from("mypartition"), String::from("otherpartition")]
+                vec!["mypartition", "otherpartition"]
             )
         );
         assert_eq!(
@@ -626,82 +647,11 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
     }
 
-    #[test]
-    fn test_path_batch_roundtrip_no_partiton() {
-        let files = vec![
-            ObjectMeta {
-                location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
-                last_modified: to_timestamp_millis(1634722979123).unwrap(),
-                size: 100,
-            },
-            ObjectMeta {
-                location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
-                last_modified: to_timestamp_millis(0).unwrap(),
-                size: 100,
-            },
-        ];
-
-        let table_path = ListingTableUrl::parse("file:///mybucket/tablepath").unwrap();
-        let batches = paths_to_batch(&[], &table_path, &files)
-            .expect("Serialization of file list to batch failed");
-
-        let parsed_files = batches_to_paths(&[batches]).unwrap();
-        assert_eq!(parsed_files.len(), 2);
-        assert_eq!(&parsed_files[0].partition_values, &[]);
-        assert_eq!(&parsed_files[1].partition_values, &[]);
-
-        let parsed_metas = parsed_files
-            .into_iter()
-            .map(|pf| pf.object_meta)
-            .collect::<Vec<_>>();
-        assert_eq!(parsed_metas, files);
-    }
-
-    #[test]
-    fn test_path_batch_roundtrip_with_partition() {
-        let files = vec![
-            ObjectMeta {
-                location: Path::from("mybucket/tablepath/part1=val1/file.parquet"),
-                last_modified: to_timestamp_millis(1634722979123).unwrap(),
-                size: 100,
-            },
-            ObjectMeta {
-                location: Path::from("mybucket/tablepath/part1=val2/file.parquet"),
-                last_modified: to_timestamp_millis(0).unwrap(),
-                size: 100,
-            },
-        ];
-
-        let batches = paths_to_batch(
-            &[(String::from("part1"), DataType::Utf8)],
-            &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
-            &files,
-        )
-        .expect("Serialization of file list to batch failed");
-
-        let parsed_files = batches_to_paths(&[batches]).unwrap();
-        assert_eq!(parsed_files.len(), 2);
-        assert_eq!(
-            &parsed_files[0].partition_values,
-            &[ScalarValue::Utf8(Some(String::from("val1")))]
-        );
-        assert_eq!(
-            &parsed_files[1].partition_values,
-            &[ScalarValue::Utf8(Some(String::from("val2")))]
-        );
-
-        let parsed_metas = parsed_files
-            .into_iter()
-            .map(|pf| pf.object_meta)
-            .collect::<Vec<_>>();
-        assert_eq!(parsed_metas, files);
-    }
-
     #[test]
     fn test_expr_applicable_for_cols() {
         assert!(expr_applicable_for_cols(
diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs
index 798359208f..dc96f959e4 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -124,6 +124,25 @@ impl ListingTableUrl {
         self.url.scheme()
     }
 
+    /// Return the prefix from which to list files
+    pub fn prefix(&self) -> &Path {
+        &self.prefix
+    }
+
+    /// Returns `true` if `path` matches this [`ListingTableUrl`]
+    pub fn contains(&self, path: &Path) -> bool {
+        match self.strip_prefix(path) {
+            Some(mut segments) => match &self.glob {
+                Some(glob) => {
+                    let stripped = segments.join("/");
+                    glob.matches(&stripped)
+                }
+                None => true,
+            },
+            None => false,
+        }
+    }
+
     /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
     /// an iterator of the remaining path segments
     pub(crate) fn strip_prefix<'a, 'b: 'a>(
@@ -131,12 +150,11 @@ impl ListingTableUrl {
         path: &'b Path,
     ) -> Option<impl Iterator<Item = &'b str> + 'a> {
         use object_store::path::DELIMITER;
-        let path: &str = path.as_ref();
-        let stripped = match self.prefix.as_ref() {
-            "" => path,
-            p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?,
-        };
-        Some(stripped.split(DELIMITER))
+        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
+        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
+            stripped = stripped.strip_prefix(DELIMITER)?;
+        }
+        Some(stripped.split_terminator(DELIMITER))
     }
 
     /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
@@ -158,17 +176,7 @@ impl ListingTableUrl {
             .try_filter(move |meta| {
                 let path = &meta.location;
                 let extension_match = path.as_ref().ends_with(file_extension);
-                let glob_match = match &self.glob {
-                    Some(glob) => match self.strip_prefix(path) {
-                        Some(mut segments) => {
-                            let stripped = segments.join("/");
-                            glob.matches(&stripped)
-                        }
-                        None => false,
-                    },
-                    None => true,
-                };
-
+                let glob_match = self.contains(path);
                 futures::future::ready(extension_match && glob_match)
             })
             .boxed()
@@ -254,6 +262,10 @@ mod tests {
         let child = Path::parse("/foob/bar").unwrap();
         assert!(url.strip_prefix(&child).is_none());
 
+        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
+        let child = Path::parse("/foo/file").unwrap();
+        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
+
         let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
         assert_eq!(url.prefix.as_ref(), "foo/ bar");
 
diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs
index b93ad02aa2..afaac5a7bd 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -18,6 +18,7 @@
 //! Test queries on partitioned datasets
 
 use arrow::datatypes::DataType;
+use std::collections::BTreeSet;
 use std::fs::File;
 use std::io::{Read, Seek, SeekFrom};
 use std::ops::Range;
@@ -39,8 +40,8 @@ use datafusion::{
     test_util::{self, arrow_test_data, parquet_test_data},
 };
 use datafusion_common::ScalarValue;
+use futures::stream;
 use futures::stream::BoxStream;
-use futures::{stream, StreamExt};
 use object_store::{
     path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
 };
@@ -594,7 +595,7 @@ async fn register_partitioned_alltypes_parquet(
 /// An object store implem that is mirrors a given file to multiple paths.
 pub struct MirroringObjectStore {
     /// The `(path,size)` of the files that "exist" in the store
-    files: Vec<String>,
+    files: Vec<Path>,
     /// The file that will be read at all path
     mirrored_file: String,
     /// Size of the mirrored file
@@ -611,7 +612,7 @@ impl MirroringObjectStore {
     pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
         let metadata = std::fs::metadata(&mirrored_file).expect("Local file metadata");
         Arc::new(Self {
-            files: paths.iter().map(|&f| f.to_owned()).collect(),
+            files: paths.iter().map(|f| Path::parse(f).unwrap()).collect(),
             mirrored_file,
             file_size: metadata.len(),
         })
@@ -640,7 +641,7 @@ impl ObjectStore for MirroringObjectStore {
     }
 
     async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
-        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        self.files.iter().find(|x| *x == location).unwrap();
         let path = std::path::PathBuf::from(&self.mirrored_file);
         let file = File::open(&path).unwrap();
         Ok(GetResult::File(file, path))
@@ -651,7 +652,7 @@ impl ObjectStore for MirroringObjectStore {
         location: &Path,
         range: Range<usize>,
     ) -> object_store::Result<Bytes> {
-        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        self.files.iter().find(|x| *x == location).unwrap();
         let path = std::path::PathBuf::from(&self.mirrored_file);
         let mut file = File::open(path).unwrap();
         file.seek(SeekFrom::Start(range.start as u64)).unwrap();
@@ -665,7 +666,7 @@ impl ObjectStore for MirroringObjectStore {
     }
 
     async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
-        self.files.iter().find(|x| *x == location.as_ref()).unwrap();
+        self.files.iter().find(|x| *x == location).unwrap();
         Ok(ObjectMeta {
             location: location.clone(),
             last_modified: Utc.timestamp_nanos(0),
@@ -681,30 +682,64 @@ impl ObjectStore for MirroringObjectStore {
         &self,
         prefix: Option<&Path>,
     ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
-        let prefix = prefix.map(|p| p.as_ref()).unwrap_or("").to_string();
-        let size = self.file_size as usize;
-        Ok(Box::pin(
-            stream::iter(
-                self.files
-                    .clone()
-                    .into_iter()
-                    .filter(move |f| f.starts_with(&prefix)),
-            )
-            .map(move |f| {
-                Ok(ObjectMeta {
-                    location: Path::parse(f)?,
-                    last_modified: Utc.timestamp_nanos(0),
-                    size,
+        let prefix = prefix.cloned().unwrap_or_default();
+        Ok(Box::pin(stream::iter(self.files.iter().filter_map(
+            move |location| {
+                // Don't return for exact prefix match
+                let filter = location
+                    .prefix_match(&prefix)
+                    .map(|mut x| x.next().is_some())
+                    .unwrap_or(false);
+
+                filter.then(|| {
+                    Ok(ObjectMeta {
+                        location: location.clone(),
+                        last_modified: Utc.timestamp_nanos(0),
+                        size: self.file_size as usize,
+                    })
                 })
-            }),
-        ))
+            },
+        ))))
     }
 
     async fn list_with_delimiter(
         &self,
-        _prefix: Option<&Path>,
+        prefix: Option<&Path>,
     ) -> object_store::Result<ListResult> {
-        unimplemented!()
+        let root = Path::default();
+        let prefix = prefix.unwrap_or(&root);
+
+        let mut common_prefixes = BTreeSet::new();
+        let mut objects = vec![];
+
+        for k in &self.files {
+            let mut parts = match k.prefix_match(prefix) {
+                Some(parts) => parts,
+                None => continue,
+            };
+
+            // Pop first element
+            let common_prefix = match parts.next() {
+                Some(p) => p,
+                // Should only return children of the prefix
+                None => continue,
+            };
+
+            if parts.next().is_some() {
+                common_prefixes.insert(prefix.child(common_prefix));
+            } else {
+                let object = ObjectMeta {
+                    location: k.clone(),
+                    last_modified: Utc.timestamp_nanos(0),
+                    size: self.file_size as usize,
+                };
+                objects.push(object);
+            }
+        }
+        Ok(ListResult {
+            common_prefixes: common_prefixes.into_iter().collect(),
+            objects,
+        })
     }
 
     async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {