You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/16 20:41:33 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6183: Faster ListingTable partition listing (#6182)

alamb commented on code in PR #6183:
URL: https://github.com/apache/arrow-datafusion/pull/6183#discussion_r1195663514


##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -153,225 +151,239 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {

Review Comment:
   I think adding some comments about specifically what `depth` and `files` fields mean would help readability. Like are the files only files or do they include paths and what does `depth` signify?



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -153,225 +151,239 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {
+    path: Path,
+    depth: usize,
+    files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+        trace!("Listing partition {}", self.path);
+        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+        let result = store.list_with_delimiter(prefix).await?;
+        self.files = Some(result.objects);
+        Ok((self, result.common_prefixes))
+    }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+    store: &dyn ObjectStore,
+    table_path: &ListingTableUrl,
+    max_depth: usize,
+) -> Result<Vec<Partition>> {
+    let partition = Partition {
+        path: table_path.prefix().clone(),
+        depth: 0,
+        files: None,
+    };
+
+    let mut out = Vec::with_capacity(64);
+
+    let mut pending = vec![];
+    let mut futures = FuturesUnordered::new();
+    futures.push(partition.list(store));
+
+    while let Some((partition, paths)) = futures.next().await.transpose()? {
+        if let Some(next) = pending.pop() {
+            futures.push(next)
+        }
+
+        let depth = partition.depth;
+        out.push(partition);
+        for path in paths {
+            let child = Partition {
+                path,
+                depth: depth + 1,
+                files: None,
+            };
+            match depth < max_depth {
+                true => match futures.len() < CONCURRENCY_LIMIT {
+                    true => futures.push(child.list(store)),
+                    false => pending.push(child.list(store)),
+                },
+                false => out.push(child),
+            }
+        }
+    }
+    Ok(out)
+}
+
+async fn prune_partitions(
+    table_path: &ListingTableUrl,
+    partitions: Vec<Partition>,
+    filters: &[Expr],
+    partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+    if filters.is_empty() {
+        return Ok(partitions);
+    }
+
+    let mut builders: Vec<_> = (0..partition_cols.len())
+        .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
+        .collect();
+
+    for partition in &partitions {
+        let cols = partition_cols.iter().map(|x| x.0.as_str());
+        let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
+            .unwrap_or_default();
+
+        let mut builders = builders.iter_mut();
+        for (p, b) in parsed.iter().zip(&mut builders) {
+            b.append_value(p);
+        }
+        builders.for_each(|b| b.append_null());
+    }
+
+    let arrays = partition_cols
+        .iter()
+        .zip(builders)
+        .map(|((_, d), mut builder)| {
+            let array = builder.finish();
+            cast(&array, d)
+        })
+        .collect::<Result<_, _>>()?;
+
+    let fields: Fields = partition_cols
+        .iter()
+        .map(|(n, d)| Field::new(n, d.clone(), true))
+        .collect();
+    let schema = Arc::new(Schema::new(fields));
+
+    let df_schema = DFSchema::new_with_metadata(
+        partition_cols
+            .iter()
+            .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+            .collect(),
+        Default::default(),
+    )?;
+
+    let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+    // TODO: Plumb this down
+    let props = ExecutionProps::new();
+
+    // Applies `filter` to `batch` returning `None` on error
+    let do_filter = |filter| -> Option<ArrayRef> {
+        let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
+        Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+    };
+
+    //.Compute the conjunction of the filters, ignoring errors
+    let mask = filters
+        .iter()
+        .fold(None, |acc, filter| match (acc, do_filter(filter)) {
+            (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
+            (None, Some(r)) => Some(r.as_boolean().clone()),
+            (r, None) => r,
+        });
+
+    let mask = match mask {
+        Some(mask) => mask,
+        None => return Ok(partitions),
+    };
+
+    // Don't retain partitions that evaluated to null
+    let prepared = match mask.null_count() {
+        0 => mask,
+        _ => prep_null_mask_filter(&mask),
+    };
+
+    // Sanity check
+    assert_eq!(prepared.len(), partitions.len());
+
+    let filtered = partitions
+        .into_iter()
+        .zip(prepared.values())
+        .filter_map(|(p, f)| f.then_some(p))
+        .collect();
+
+    Ok(filtered)
+}
+
 /// Discover the partitions on the given path and prune out files
 /// that belong to irrelevant partitions using `filters` expressions.
 /// `filters` might contain expressions that can be resolved only at the
 /// file level (e.g. Parquet row group pruning).
-///
-/// TODO for tables with many files (10k+), it will usually more efficient
-/// to first list the folders relative to the first partition dimension,
-/// prune those, then list only the contain of the remaining folders.
 pub async fn pruned_partition_list<'a>(
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
     file_extension: &'a str,
-    table_partition_cols: &'a [(String, DataType)],
+    partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
     let list = table_path.list_all_files(store, file_extension);
 
     // if no partition col => simply list all the files
-    if table_partition_cols.is_empty() {
+    if partition_cols.is_empty() {
         return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
     }
 
-    let applicable_filters: Vec<_> = filters
-        .iter()
-        .filter(|f| {
-            expr_applicable_for_cols(
-                &table_partition_cols
-                    .iter()
-                    .map(|x| x.0.clone())
-                    .collect::<Vec<_>>(),
-                f,
-            )
-        })
-        .collect();
+    let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
+    info!("Listed {} partitions", partitions.len());

Review Comment:
   I recommend this be `debug` as it is not likely useful for general operation



##########
datafusion/core/src/datasource/listing/url.rs:
##########
@@ -158,17 +176,7 @@ impl ListingTableUrl {
             .try_filter(move |meta| {
                 let path = &meta.location;
                 let extension_match = path.as_ref().ends_with(file_extension);
-                let glob_match = match &self.glob {
-                    Some(glob) => match self.strip_prefix(path) {
-                        Some(mut segments) => {
-                            let stripped = segments.join("/");
-                            glob.matches(&stripped)
-                        }
-                        None => false,
-                    },
-                    None => true,
-                };
-
+                let glob_match = self.contains(path);

Review Comment:
   this is a nice refactoring



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -610,98 +622,27 @@ mod tests {
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/v1/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
-                &[String::from("mypartition"), String::from("otherpartition")]
+                vec!["mypartition", "otherpartition"]
             )
         );
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
                 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
-                &[String::from("mypartition")]
+                vec!["mypartition"]
             )
         );
     }
 
-    #[test]
-    fn test_path_batch_roundtrip_no_partiton() {

Review Comment:
   What were these tests removed?



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -153,225 +151,239 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {
+    path: Path,
+    depth: usize,
+    files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+        trace!("Listing partition {}", self.path);
+        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+        let result = store.list_with_delimiter(prefix).await?;
+        self.files = Some(result.objects);
+        Ok((self, result.common_prefixes))
+    }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+    store: &dyn ObjectStore,
+    table_path: &ListingTableUrl,
+    max_depth: usize,
+) -> Result<Vec<Partition>> {
+    let partition = Partition {
+        path: table_path.prefix().clone(),
+        depth: 0,
+        files: None,
+    };
+
+    let mut out = Vec::with_capacity(64);
+
+    let mut pending = vec![];
+    let mut futures = FuturesUnordered::new();
+    futures.push(partition.list(store));
+
+    while let Some((partition, paths)) = futures.next().await.transpose()? {
+        if let Some(next) = pending.pop() {
+            futures.push(next)
+        }
+
+        let depth = partition.depth;
+        out.push(partition);
+        for path in paths {
+            let child = Partition {
+                path,
+                depth: depth + 1,
+                files: None,
+            };
+            match depth < max_depth {
+                true => match futures.len() < CONCURRENCY_LIMIT {
+                    true => futures.push(child.list(store)),
+                    false => pending.push(child.list(store)),
+                },
+                false => out.push(child),
+            }
+        }
+    }
+    Ok(out)
+}
+
+async fn prune_partitions(
+    table_path: &ListingTableUrl,
+    partitions: Vec<Partition>,
+    filters: &[Expr],
+    partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+    if filters.is_empty() {
+        return Ok(partitions);
+    }
+
+    let mut builders: Vec<_> = (0..partition_cols.len())
+        .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
+        .collect();
+
+    for partition in &partitions {
+        let cols = partition_cols.iter().map(|x| x.0.as_str());
+        let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
+            .unwrap_or_default();
+
+        let mut builders = builders.iter_mut();
+        for (p, b) in parsed.iter().zip(&mut builders) {
+            b.append_value(p);
+        }
+        builders.for_each(|b| b.append_null());
+    }
+
+    let arrays = partition_cols
+        .iter()
+        .zip(builders)
+        .map(|((_, d), mut builder)| {
+            let array = builder.finish();
+            cast(&array, d)
+        })
+        .collect::<Result<_, _>>()?;
+
+    let fields: Fields = partition_cols
+        .iter()
+        .map(|(n, d)| Field::new(n, d.clone(), true))
+        .collect();
+    let schema = Arc::new(Schema::new(fields));
+
+    let df_schema = DFSchema::new_with_metadata(
+        partition_cols
+            .iter()
+            .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+            .collect(),
+        Default::default(),
+    )?;
+
+    let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+    // TODO: Plumb this down
+    let props = ExecutionProps::new();
+
+    // Applies `filter` to `batch` returning `None` on error
+    let do_filter = |filter| -> Option<ArrayRef> {
+        let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
+        Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
+    };
+
+    //.Compute the conjunction of the filters, ignoring errors
+    let mask = filters
+        .iter()
+        .fold(None, |acc, filter| match (acc, do_filter(filter)) {
+            (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
+            (None, Some(r)) => Some(r.as_boolean().clone()),
+            (r, None) => r,
+        });
+
+    let mask = match mask {
+        Some(mask) => mask,
+        None => return Ok(partitions),
+    };
+
+    // Don't retain partitions that evaluated to null
+    let prepared = match mask.null_count() {
+        0 => mask,
+        _ => prep_null_mask_filter(&mask),
+    };
+
+    // Sanity check
+    assert_eq!(prepared.len(), partitions.len());
+
+    let filtered = partitions
+        .into_iter()
+        .zip(prepared.values())
+        .filter_map(|(p, f)| f.then_some(p))
+        .collect();
+
+    Ok(filtered)
+}
+
 /// Discover the partitions on the given path and prune out files
 /// that belong to irrelevant partitions using `filters` expressions.
 /// `filters` might contain expressions that can be resolved only at the
 /// file level (e.g. Parquet row group pruning).
-///
-/// TODO for tables with many files (10k+), it will usually more efficient
-/// to first list the folders relative to the first partition dimension,
-/// prune those, then list only the contain of the remaining folders.
 pub async fn pruned_partition_list<'a>(
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
     file_extension: &'a str,
-    table_partition_cols: &'a [(String, DataType)],
+    partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
     let list = table_path.list_all_files(store, file_extension);
 
     // if no partition col => simply list all the files
-    if table_partition_cols.is_empty() {
+    if partition_cols.is_empty() {
         return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
     }
 
-    let applicable_filters: Vec<_> = filters
-        .iter()
-        .filter(|f| {
-            expr_applicable_for_cols(
-                &table_partition_cols
-                    .iter()
-                    .map(|x| x.0.clone())
-                    .collect::<Vec<_>>(),
-                f,
-            )
-        })
-        .collect();
+    let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
+    info!("Listed {} partitions", partitions.len());
 
-    if applicable_filters.is_empty() {
-        // Parse the partition values while listing all the files
-        // Note: We might avoid parsing the partition values if they are not used in any projection,
-        // but the cost of parsing will likely be far dominated by the time to fetch the listing from
-        // the object store.
-        Ok(Box::pin(list.try_filter_map(
-            move |object_meta| async move {
-                let parsed_path = parse_partitions_for_path(
-                    table_path,
-                    &object_meta.location,
-                    &table_partition_cols
-                        .iter()
-                        .map(|x| x.0.clone())
-                        .collect::<Vec<_>>(),
-                )
-                .map(|p| {
-                    p.iter()
-                        .zip(table_partition_cols)
-                        .map(|(&part_value, part_column)| {
-                            ScalarValue::try_from_string(
-                                part_value.to_string(),
-                                &part_column.1,
-                            )
-                            .unwrap_or_else(|_| {
-                                panic!(
-                                    "Failed to cast str {} to type {}",
-                                    part_value, part_column.1
-                                )
-                            })
-                        })
-                        .collect()
-                });
-
-                Ok(parsed_path.map(|partition_values| PartitionedFile {
-                    partition_values,
-                    object_meta,
-                    range: None,
-                    extensions: None,
-                }))
-            },
-        )))
-    } else {
-        // parse the partition values and serde them as a RecordBatch to filter them
-        let metas: Vec<_> = list.try_collect().await?;
-        let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
-        let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
-        debug!("get mem_table: {:?}", mem_table);
-
-        // Filter the partitions using a local datafusion context
-        // TODO having the external context would allow us to resolve `Volatility::Stable`
-        // scalar functions (`ScalarFunction` & `ScalarUDF`) and `ScalarVariable`s
-        let ctx = SessionContext::new();
-        let mut df = ctx.read_table(Arc::new(mem_table))?;
-        for filter in applicable_filters {
-            df = df.filter(filter.clone())?;
-        }
-        let filtered_batches = df.collect().await?;
-        let paths = batches_to_paths(&filtered_batches)?;
+    let pruned =
+        prune_partitions(table_path, partitions, filters, partition_cols).await?;
 
-        Ok(Box::pin(futures::stream::iter(paths.into_iter().map(Ok))))
-    }
-}
+    info!("Pruning yielded {} partitions", pruned.len());

Review Comment:
   Likewise, I recommend `debug!` here as well



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -153,225 +151,239 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {
+    path: Path,
+    depth: usize,
+    files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+        trace!("Listing partition {}", self.path);
+        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+        let result = store.list_with_delimiter(prefix).await?;
+        self.files = Some(result.objects);
+        Ok((self, result.common_prefixes))
+    }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+    store: &dyn ObjectStore,
+    table_path: &ListingTableUrl,
+    max_depth: usize,
+) -> Result<Vec<Partition>> {
+    let partition = Partition {
+        path: table_path.prefix().clone(),
+        depth: 0,
+        files: None,
+    };
+
+    let mut out = Vec::with_capacity(64);
+
+    let mut pending = vec![];
+    let mut futures = FuturesUnordered::new();
+    futures.push(partition.list(store));
+
+    while let Some((partition, paths)) = futures.next().await.transpose()? {
+        if let Some(next) = pending.pop() {

Review Comment:
   Is there an invariant that if pending is non empty, then futures.len() prior to the loop is `CONCURRENCY_LIMT`? I am trying work out why only one pending future pushed to `futures` rather than pushing while `futures.len() < CONCURRENCY_LIMIT`



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -152,225 +146,224 @@ pub fn split_files(
         .collect()
 }
 
+struct Partition {
+    path: Path,
+    depth: usize,
+    files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
+        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+        let result = store.list_with_delimiter(prefix).await?;
+        self.files = Some(result.objects);
+        Ok((self, result.common_prefixes))
+    }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
+async fn list_partitions(
+    store: &dyn ObjectStore,
+    table_path: &ListingTableUrl,
+    max_depth: usize,
+) -> Result<Vec<Partition>> {
+    let partition = Partition {
+        path: table_path.prefix().clone(),
+        depth: 0,
+        files: None,
+    };
+
+    let mut out = Vec::with_capacity(64);
+    let mut futures = FuturesUnordered::new();
+    futures.push(partition.list(store));
+
+    while let Some((partition, paths)) = futures.next().await.transpose()? {
+        let depth = partition.depth;
+        out.push(partition);
+        for path in paths {
+            let child = Partition {
+                path,
+                depth: depth + 1,
+                files: None,
+            };
+            match depth < max_depth {
+                true => futures.push(child.list(store)),
+                false => out.push(child),
+            }
+        }
+    }
+    Ok(out)
+}
+
+async fn prune_partitions(
+    table_path: &ListingTableUrl,
+    partitions: Vec<Partition>,
+    filters: &[Expr],
+    partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+    if filters.is_empty() {
+        return Ok(partitions);
+    }
+
+    let mut builders: Vec<_> = (0..partition_cols.len())
+        .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
+        .collect();
+
+    for partition in &partitions {
+        let cols = partition_cols.iter().map(|x| x.0.as_str());
+        let parsed = parse_partitions_for_path(&table_path, &partition.path, cols)
+            .unwrap_or_default();
+
+        let mut builders = builders.iter_mut();
+        for (p, b) in parsed.iter().zip(&mut builders) {
+            b.append_value(p);
+        }
+        builders.for_each(|b| b.append_null());
+    }
+
+    let arrays = partition_cols
+        .iter()
+        .zip(builders)
+        .map(|((_, d), mut builder)| {
+            let array = builder.finish();
+            cast(&array, d)
+        })
+        .collect::<Result<_, _>>()?;
+
+    let fields: Fields = partition_cols
+        .into_iter()
+        .map(|(n, d)| Field::new(n, d.clone(), true))
+        .collect();
+    let schema = Arc::new(Schema::new(fields));
+
+    let df_schema = DFSchema::new_with_metadata(
+        partition_cols
+            .into_iter()
+            .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+            .collect(),
+        Default::default(),
+    )?;
+
+    let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+    // TODO: Plumb this down

Review Comment:
   I think it was even worse (the old code creates an entire `SessionContext` right)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org