You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/24 18:06:10 UTC
[arrow-datafusion] branch master updated: Support multiple paths for ListingTableScanNode (#2775)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 858f9f16c Support multiple paths for ListingTableScanNode (#2775)
858f9f16c is described below
commit 858f9f16c670cfa7b11b3d234c5dbdee1fda8c1f
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat Jun 25 02:06:06 2022 +0800
Support multiple paths for ListingTableScanNode (#2775)
* Support multiple paths for ListingTableScanNode
* add test
---
datafusion/core/src/datasource/listing/table.rs | 188 ++++++++++++++++++++----
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/logical_plan.rs | 20 ++-
3 files changed, 178 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 29cde3c9f..1b0807085 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
-use futures::{StreamExt, TryStreamExt};
+use futures::{future, stream, StreamExt, TryStreamExt};
use crate::datasource::{
file_format::{
@@ -50,8 +50,9 @@ use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_file
/// Configuration for creating a 'ListingTable'
pub struct ListingTableConfig {
- /// Path on the `ObjectStore` for creating `ListingTable`.
- pub table_path: ListingTableUrl,
+ /// Paths on the `ObjectStore` for creating `ListingTable`.
+ /// They should share the same schema and object store.
+ pub table_paths: Vec<ListingTableUrl>,
/// Optional `SchemaRef` for the to be created `ListingTable`.
pub file_schema: Option<SchemaRef>,
/// Optional `ListingOptions` for the to be created `ListingTable`.
@@ -59,10 +60,22 @@ pub struct ListingTableConfig {
}
impl ListingTableConfig {
- /// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
+ /// Creates new `ListingTableConfig`.
+ /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
pub fn new(table_path: ListingTableUrl) -> Self {
+ let table_paths = vec![table_path];
Self {
- table_path,
+ table_paths,
+ file_schema: None,
+ options: None,
+ }
+ }
+
+ /// Creates new `ListingTableConfig` with multiple table paths.
+ /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
+ pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
+ Self {
+ table_paths,
file_schema: None,
options: None,
}
@@ -70,7 +83,7 @@ impl ListingTableConfig {
/// Add `schema` to `ListingTableConfig`
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
- table_path: self.table_path,
+ table_paths: self.table_paths,
file_schema: Some(schema),
options: self.options,
}
@@ -79,7 +92,7 @@ impl ListingTableConfig {
/// Add `listing_options` to `ListingTableConfig`
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
- table_path: self.table_path,
+ table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
}
@@ -100,10 +113,14 @@ impl ListingTableConfig {
/// Infer `ListingOptions` based on `table_path` suffix.
pub async fn infer_options(self, ctx: &SessionState) -> Result<Self> {
- let store = ctx.runtime_env.object_store(&self.table_path)?;
+ let store = ctx
+ .runtime_env
+ .object_store(&self.table_paths.get(0).unwrap())?;
let file = self
- .table_path
+ .table_paths
+ .get(0)
+ .unwrap()
.list_all_files(store.as_ref(), "")
.next()
.await
@@ -124,7 +141,7 @@ impl ListingTableConfig {
};
Ok(Self {
- table_path: self.table_path,
+ table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
})
@@ -134,10 +151,12 @@ impl ListingTableConfig {
pub async fn infer_schema(self, ctx: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
- let schema = options.infer_schema(ctx, &self.table_path).await?;
+ let schema = options
+ .infer_schema(ctx, self.table_paths.get(0).unwrap())
+ .await?;
Ok(Self {
- table_path: self.table_path,
+ table_paths: self.table_paths,
file_schema: Some(schema),
options: Some(options),
})
@@ -221,7 +240,7 @@ impl ListingOptions {
/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
- table_path: ListingTableUrl,
+ table_paths: Vec<ListingTableUrl>,
/// File fields only
file_schema: SchemaRef,
/// File fields + partition columns
@@ -257,7 +276,7 @@ impl ListingTable {
}
let table = Self {
- table_path: config.table_path,
+ table_paths: config.table_paths,
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
@@ -266,9 +285,9 @@ impl ListingTable {
Ok(table)
}
- /// Get path ref
- pub fn table_path(&self) -> &ListingTableUrl {
- &self.table_path
+ /// Get paths ref
+ pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
+ &self.table_paths
}
/// Get options ref
@@ -313,7 +332,7 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
FileScanConfig {
- object_store_url: self.table_path.object_store(),
+ object_store_url: self.table_paths.get(0).unwrap().object_store(),
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
@@ -351,17 +370,23 @@ impl ListingTable {
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
- let store = ctx.runtime_env.object_store(&self.table_path)?;
+ let store = ctx
+ .runtime_env
+ .object_store(&self.table_paths.get(0).unwrap())?;
// list files (with partitions)
- let file_list = pruned_partition_list(
- store.as_ref(),
- &self.table_path,
- filters,
- &self.options.file_extension,
- &self.options.table_partition_cols,
- )
+ let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
+ pruned_partition_list(
+ store.as_ref(),
+ table_path,
+ filters,
+ &self.options.file_extension,
+ &self.options.table_partition_cols,
+ )
+ }))
.await?;
+ let file_list = stream::iter(file_list).flatten();
+
// collect the statistics if required by the config
// TODO: Collect statistics and schema in single-pass
let files = file_list.then(|part_file| async {
@@ -553,6 +578,77 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_assert_list_files_for_multi_path() -> Result<()> {
+ // more expected partitions than files
+ assert_list_files_for_multi_paths(
+ &[
+ "bucket/key1/file0",
+ "bucket/key1/file1",
+ "bucket/key1/file2",
+ "bucket/key2/file3",
+ "bucket/key2/file4",
+ "bucket/key3/file5",
+ ],
+ &["test:///bucket/key1/", "test:///bucket/key2/"],
+ 12,
+ 5,
+ )
+ .await?;
+
+ // as many expected partitions as files
+ assert_list_files_for_multi_paths(
+ &[
+ "bucket/key1/file0",
+ "bucket/key1/file1",
+ "bucket/key1/file2",
+ "bucket/key2/file3",
+ "bucket/key2/file4",
+ "bucket/key3/file5",
+ ],
+ &["test:///bucket/key1/", "test:///bucket/key2/"],
+ 5,
+ 5,
+ )
+ .await?;
+
+ // more files as expected partitions
+ assert_list_files_for_multi_paths(
+ &[
+ "bucket/key1/file0",
+ "bucket/key1/file1",
+ "bucket/key1/file2",
+ "bucket/key2/file3",
+ "bucket/key2/file4",
+ "bucket/key3/file5",
+ ],
+ &["test:///bucket/key1/"],
+ 2,
+ 2,
+ )
+ .await?;
+
+ // no files => no groups
+ assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0).await?;
+
+ // files that don't match the prefix
+ assert_list_files_for_multi_paths(
+ &[
+ "bucket/key1/file0",
+ "bucket/key1/file1",
+ "bucket/key1/file2",
+ "bucket/key2/file3",
+ "bucket/key2/file4",
+ "bucket/key3/file5",
+ ],
+ &["test:///bucket/key3/"],
+ 2,
+ 1,
+ )
+ .await?;
+ Ok(())
+ }
+
async fn load_table(
ctx: &SessionContext,
name: &str,
@@ -604,4 +700,44 @@ mod tests {
Ok(())
}
+
+ /// Check that the files listed by the table match the specified `output_partitioning`
+ /// when the object store contains `files`.
+ async fn assert_list_files_for_multi_paths(
+ files: &[&str],
+ table_prefix: &[&str],
+ target_partitions: usize,
+ output_partitioning: usize,
+ ) -> Result<()> {
+ let ctx = SessionContext::new();
+ register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
+
+ let format = AvroFormat {};
+
+ let opt = ListingOptions {
+ file_extension: "".to_owned(),
+ format: Arc::new(format),
+ table_partition_cols: vec![],
+ target_partitions,
+ collect_stat: true,
+ };
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+
+ let table_paths = table_prefix
+ .iter()
+ .map(|t| ListingTableUrl::parse(t).unwrap())
+ .collect();
+ let config = ListingTableConfig::new_with_multi_paths(table_paths)
+ .with_listing_options(opt)
+ .with_schema(Arc::new(schema));
+
+ let table = ListingTable::try_new(config)?;
+
+ let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
+
+ assert_eq!(file_list.len(), output_partitioning);
+
+ Ok(())
+ }
}
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index dffc8ec2f..e12760d9a 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -93,7 +93,7 @@ message AvroFormat {}
message ListingTableScanNode {
string table_name = 1;
- string path = 2;
+ repeated string paths = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
datafusion.Schema schema = 5;
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index c1ee0ab05..1850d57fd 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -410,7 +410,12 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
};
- let table_path = ListingTableUrl::parse(&scan.path)?;
+ // let table_path = ListingTableUrl::parse(&scan.paths)?;
+ let table_paths = &scan
+ .paths
+ .iter()
+ .map(|p| ListingTableUrl::parse(p).unwrap())
+ .collect::<Vec<ListingTableUrl>>();
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
@@ -419,9 +424,10 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};
- let config = ListingTableConfig::new(table_path)
- .with_listing_options(options)
- .with_schema(Arc::new(schema));
+ let config =
+ ListingTableConfig::new_with_multi_paths(table_paths.clone())
+ .with_listing_options(options)
+ .with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
@@ -758,7 +764,11 @@ impl AsLogicalPlan for LogicalPlanNode {
.options()
.table_partition_cols
.clone(),
- path: listing_table.table_path().to_string(),
+ paths: listing_table
+ .table_paths()
+ .iter()
+ .map(|x| x.to_string())
+ .collect(),
schema: Some(schema),
projection,
filters,