You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/24 18:06:10 UTC

[arrow-datafusion] branch master updated: Support multiple paths for ListingTableScanNode (#2775)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 858f9f16c Support multiple paths for ListingTableScanNode (#2775)
858f9f16c is described below

commit 858f9f16c670cfa7b11b3d234c5dbdee1fda8c1f
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sat Jun 25 02:06:06 2022 +0800

    Support multiple paths for ListingTableScanNode (#2775)
    
    * Support multiple paths for ListingTableScanNode
    
    * add test
---
 datafusion/core/src/datasource/listing/table.rs | 188 ++++++++++++++++++++----
 datafusion/proto/proto/datafusion.proto         |   2 +-
 datafusion/proto/src/logical_plan.rs            |  20 ++-
 3 files changed, 178 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 29cde3c9f..1b0807085 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use async_trait::async_trait;
-use futures::{StreamExt, TryStreamExt};
+use futures::{future, stream, StreamExt, TryStreamExt};
 
 use crate::datasource::{
     file_format::{
@@ -50,8 +50,9 @@ use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_file
 
 /// Configuration for creating a 'ListingTable'  
 pub struct ListingTableConfig {
-    /// Path on the `ObjectStore` for creating `ListingTable`.
-    pub table_path: ListingTableUrl,
+    /// Paths on the `ObjectStore` for creating `ListingTable`.
+    /// They should share the same schema and object store.
+    pub table_paths: Vec<ListingTableUrl>,
     /// Optional `SchemaRef` for the to be created `ListingTable`.
     pub file_schema: Option<SchemaRef>,
     /// Optional `ListingOptions` for the to be created `ListingTable`.
@@ -59,10 +60,22 @@ pub struct ListingTableConfig {
 }
 
 impl ListingTableConfig {
-    /// Creates new `ListingTableConfig`.  The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
+    /// Creates new `ListingTableConfig`.
+    /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
     pub fn new(table_path: ListingTableUrl) -> Self {
+        let table_paths = vec![table_path];
         Self {
-            table_path,
+            table_paths,
+            file_schema: None,
+            options: None,
+        }
+    }
+
+    /// Creates new `ListingTableConfig` with multiple table paths.
+    /// The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_paths` first element.
+    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
+        Self {
+            table_paths,
             file_schema: None,
             options: None,
         }
@@ -70,7 +83,7 @@ impl ListingTableConfig {
     /// Add `schema` to `ListingTableConfig`
     pub fn with_schema(self, schema: SchemaRef) -> Self {
         Self {
-            table_path: self.table_path,
+            table_paths: self.table_paths,
             file_schema: Some(schema),
             options: self.options,
         }
@@ -79,7 +92,7 @@ impl ListingTableConfig {
     /// Add `listing_options` to `ListingTableConfig`
     pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
         Self {
-            table_path: self.table_path,
+            table_paths: self.table_paths,
             file_schema: self.file_schema,
             options: Some(listing_options),
         }
@@ -100,10 +113,14 @@ impl ListingTableConfig {
 
     /// Infer `ListingOptions` based on `table_path` suffix.
     pub async fn infer_options(self, ctx: &SessionState) -> Result<Self> {
-        let store = ctx.runtime_env.object_store(&self.table_path)?;
+        let store = ctx
+            .runtime_env
+            .object_store(&self.table_paths.get(0).unwrap())?;
 
         let file = self
-            .table_path
+            .table_paths
+            .get(0)
+            .unwrap()
             .list_all_files(store.as_ref(), "")
             .next()
             .await
@@ -124,7 +141,7 @@ impl ListingTableConfig {
         };
 
         Ok(Self {
-            table_path: self.table_path,
+            table_paths: self.table_paths,
             file_schema: self.file_schema,
             options: Some(listing_options),
         })
@@ -134,10 +151,12 @@ impl ListingTableConfig {
     pub async fn infer_schema(self, ctx: &SessionState) -> Result<Self> {
         match self.options {
             Some(options) => {
-                let schema = options.infer_schema(ctx, &self.table_path).await?;
+                let schema = options
+                    .infer_schema(ctx, self.table_paths.get(0).unwrap())
+                    .await?;
 
                 Ok(Self {
-                    table_path: self.table_path,
+                    table_paths: self.table_paths,
                     file_schema: Some(schema),
                     options: Some(options),
                 })
@@ -221,7 +240,7 @@ impl ListingOptions {
 /// An implementation of `TableProvider` that uses the object store
 /// or file system listing capability to get the list of files.
 pub struct ListingTable {
-    table_path: ListingTableUrl,
+    table_paths: Vec<ListingTableUrl>,
     /// File fields only
     file_schema: SchemaRef,
     /// File fields + partition columns
@@ -257,7 +276,7 @@ impl ListingTable {
         }
 
         let table = Self {
-            table_path: config.table_path,
+            table_paths: config.table_paths,
             file_schema,
             table_schema: Arc::new(Schema::new(table_fields)),
             options,
@@ -266,9 +285,9 @@ impl ListingTable {
         Ok(table)
     }
 
-    /// Get path ref
-    pub fn table_path(&self) -> &ListingTableUrl {
-        &self.table_path
+    /// Get paths ref
+    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
+        &self.table_paths
     }
 
     /// Get options ref
@@ -313,7 +332,7 @@ impl TableProvider for ListingTable {
             .format
             .create_physical_plan(
                 FileScanConfig {
-                    object_store_url: self.table_path.object_store(),
+                    object_store_url: self.table_paths.get(0).unwrap().object_store(),
                     file_schema: Arc::clone(&self.file_schema),
                     file_groups: partitioned_file_lists,
                     statistics,
@@ -351,17 +370,23 @@ impl ListingTable {
         filters: &'a [Expr],
         limit: Option<usize>,
     ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
-        let store = ctx.runtime_env.object_store(&self.table_path)?;
+        let store = ctx
+            .runtime_env
+            .object_store(&self.table_paths.get(0).unwrap())?;
         // list files (with partitions)
-        let file_list = pruned_partition_list(
-            store.as_ref(),
-            &self.table_path,
-            filters,
-            &self.options.file_extension,
-            &self.options.table_partition_cols,
-        )
+        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
+            pruned_partition_list(
+                store.as_ref(),
+                table_path,
+                filters,
+                &self.options.file_extension,
+                &self.options.table_partition_cols,
+            )
+        }))
         .await?;
 
+        let file_list = stream::iter(file_list).flatten();
+
         // collect the statistics if required by the config
         // TODO: Collect statistics and schema in single-pass
         let files = file_list.then(|part_file| async {
@@ -553,6 +578,77 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_assert_list_files_for_multi_path() -> Result<()> {
+        // more expected partitions than files
+        assert_list_files_for_multi_paths(
+            &[
+                "bucket/key1/file0",
+                "bucket/key1/file1",
+                "bucket/key1/file2",
+                "bucket/key2/file3",
+                "bucket/key2/file4",
+                "bucket/key3/file5",
+            ],
+            &["test:///bucket/key1/", "test:///bucket/key2/"],
+            12,
+            5,
+        )
+        .await?;
+
+        // as many expected partitions as files
+        assert_list_files_for_multi_paths(
+            &[
+                "bucket/key1/file0",
+                "bucket/key1/file1",
+                "bucket/key1/file2",
+                "bucket/key2/file3",
+                "bucket/key2/file4",
+                "bucket/key3/file5",
+            ],
+            &["test:///bucket/key1/", "test:///bucket/key2/"],
+            5,
+            5,
+        )
+        .await?;
+
+        // more files as expected partitions
+        assert_list_files_for_multi_paths(
+            &[
+                "bucket/key1/file0",
+                "bucket/key1/file1",
+                "bucket/key1/file2",
+                "bucket/key2/file3",
+                "bucket/key2/file4",
+                "bucket/key3/file5",
+            ],
+            &["test:///bucket/key1/"],
+            2,
+            2,
+        )
+        .await?;
+
+        // no files => no groups
+        assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0).await?;
+
+        // files that don't match the prefix
+        assert_list_files_for_multi_paths(
+            &[
+                "bucket/key1/file0",
+                "bucket/key1/file1",
+                "bucket/key1/file2",
+                "bucket/key2/file3",
+                "bucket/key2/file4",
+                "bucket/key3/file5",
+            ],
+            &["test:///bucket/key3/"],
+            2,
+            1,
+        )
+        .await?;
+        Ok(())
+    }
+
     async fn load_table(
         ctx: &SessionContext,
         name: &str,
@@ -604,4 +700,44 @@ mod tests {
 
         Ok(())
     }
+
+    /// Check that the files listed by the table match the specified `output_partitioning`
+    /// when the object store contains `files`.
+    async fn assert_list_files_for_multi_paths(
+        files: &[&str],
+        table_prefix: &[&str],
+        target_partitions: usize,
+        output_partitioning: usize,
+    ) -> Result<()> {
+        let ctx = SessionContext::new();
+        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
+
+        let format = AvroFormat {};
+
+        let opt = ListingOptions {
+            file_extension: "".to_owned(),
+            format: Arc::new(format),
+            table_partition_cols: vec![],
+            target_partitions,
+            collect_stat: true,
+        };
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+
+        let table_paths = table_prefix
+            .iter()
+            .map(|t| ListingTableUrl::parse(t).unwrap())
+            .collect();
+        let config = ListingTableConfig::new_with_multi_paths(table_paths)
+            .with_listing_options(opt)
+            .with_schema(Arc::new(schema));
+
+        let table = ListingTable::try_new(config)?;
+
+        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
+
+        assert_eq!(file_list.len(), output_partitioning);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index dffc8ec2f..e12760d9a 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -93,7 +93,7 @@ message AvroFormat {}
 
 message ListingTableScanNode {
   string table_name = 1;
-  string path = 2;
+  repeated string paths = 2;
   string file_extension = 3;
   ProjectionColumns projection = 4;
   datafusion.Schema schema = 5;
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index c1ee0ab05..1850d57fd 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -410,7 +410,12 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
                     };
 
-                let table_path = ListingTableUrl::parse(&scan.path)?;
+                // let table_path = ListingTableUrl::parse(&scan.paths)?;
+                let table_paths = &scan
+                    .paths
+                    .iter()
+                    .map(|p| ListingTableUrl::parse(p).unwrap())
+                    .collect::<Vec<ListingTableUrl>>();
                 let options = ListingOptions {
                     file_extension: scan.file_extension.clone(),
                     format: file_format,
@@ -419,9 +424,10 @@ impl AsLogicalPlan for LogicalPlanNode {
                     target_partitions: scan.target_partitions as usize,
                 };
 
-                let config = ListingTableConfig::new(table_path)
-                    .with_listing_options(options)
-                    .with_schema(Arc::new(schema));
+                let config =
+                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
+                        .with_listing_options(options)
+                        .with_schema(Arc::new(schema));
 
                 let provider = ListingTable::try_new(config)?;
 
@@ -758,7 +764,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                                     .options()
                                     .table_partition_cols
                                     .clone(),
-                                path: listing_table.table_path().to_string(),
+                                paths: listing_table
+                                    .table_paths()
+                                    .iter()
+                                    .map(|x| x.to_string())
+                                    .collect(),
                                 schema: Some(schema),
                                 projection,
                                 filters,