You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/17 11:07:55 UTC

[arrow-datafusion] branch master updated: Fix limited statistic collection accross files with no stats (#4521)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 067d0443d Fix limited statistic collection accross files with no stats (#4521)
067d0443d is described below

commit 067d0443d2167cc33666c6490f383083a1c2b200
Author: Batuhan Taskaya <is...@gmail.com>
AuthorDate: Sat Dec 17 14:07:49 2022 +0300

    Fix limited statistic collection accross files with no stats (#4521)
---
 datafusion/core/src/datasource/listing/table.rs  | 24 +++++++++++++++++
 datafusion/core/src/datasource/mod.rs            | 34 ++++++++++++++++++------
 datafusion/proto/src/physical_plan/from_proto.rs | 14 ++++++++--
 3 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 22fcb9216..3ea62dbd0 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -747,6 +747,30 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn load_table_stats_when_no_stats() -> Result<()> {
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+        let table_path = ListingTableUrl::parse(filename).unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+
+        let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())))
+            .with_collect_stat(false);
+        let schema = opt.infer_schema(&state, &table_path).await?;
+        let config = ListingTableConfig::new(table_path)
+            .with_listing_options(opt)
+            .with_schema(schema);
+        let table = ListingTable::try_new(config)?;
+
+        let exec = table.scan(&state, None, &[], None).await?;
+        assert_eq!(exec.statistics().num_rows, None);
+        assert_eq!(exec.statistics().total_byte_size, None);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_try_create_output_ordering() {
         let testdata = crate::test_util::parquet_test_data();
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 8610607b6..a0d512109 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -48,7 +48,6 @@ use futures::StreamExt;
 /// Get all files as well as the file level summary statistics (no statistic for partition columns).
 /// If the optional `limit` is provided, includes only sufficient files.
 /// Needed to read up to `limit` number of rows.
-/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
 pub async fn get_statistics_with_limit(
     all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
     file_schema: SchemaRef,
@@ -56,21 +55,35 @@ pub async fn get_statistics_with_limit(
 ) -> Result<(Vec<PartitionedFile>, Statistics)> {
     let mut result_files = vec![];
 
-    let mut total_byte_size = 0;
     let mut null_counts = vec![0; file_schema.fields().len()];
     let mut has_statistics = false;
     let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
 
-    let mut num_rows = 0;
     let mut is_exact = true;
+
+    // The number of rows and the total byte size can be calculated as long as
+    // at least one file has them. If none of the files provide them, then they
+    // will be omitted from the statistics. The missing values will be counted
+    // as zero.
+    let mut num_rows = None;
+    let mut total_byte_size = None;
+
     // fusing the stream allows us to call next safely even once it is finished
     let mut all_files = Box::pin(all_files.fuse());
     while let Some(res) = all_files.next().await {
         let (file, file_stats) = res?;
         result_files.push(file);
         is_exact &= file_stats.is_exact;
-        num_rows += file_stats.num_rows.unwrap_or(0);
-        total_byte_size += file_stats.total_byte_size.unwrap_or(0);
+        num_rows = if let Some(num_rows) = num_rows {
+            Some(num_rows + file_stats.num_rows.unwrap_or(0))
+        } else {
+            file_stats.num_rows
+        };
+        total_byte_size = if let Some(total_byte_size) = total_byte_size {
+            Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
+        } else {
+            file_stats.total_byte_size
+        };
         if let Some(vec) = &file_stats.column_statistics {
             has_statistics = true;
             for (i, cs) in vec.iter().enumerate() {
@@ -103,7 +116,12 @@ pub async fn get_statistics_with_limit(
                 }
             }
         }
-        if num_rows > limit.unwrap_or(usize::MAX) {
+
+        // If the number of rows exceeds the limit, we can stop processing
+        // files. This only applies when we know the number of rows. It also
+        // currently ignores tables that have no statistics regarding the
+        // number of rows.
+        if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
             break;
         }
     }
@@ -126,8 +144,8 @@ pub async fn get_statistics_with_limit(
     };
 
     let statistics = Statistics {
-        num_rows: Some(num_rows),
-        total_byte_size: Some(total_byte_size),
+        num_rows,
+        total_byte_size,
         column_statistics: column_stats,
         is_exact,
     };
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 825399825..cd79e4206 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -362,14 +362,24 @@ impl TryInto<Statistics> for &protobuf::Statistics {
     type Error = DataFusionError;
 
     fn try_into(self) -> Result<Statistics, Self::Error> {
+        // Keep it sync with Statistics::to_proto
+        let none_value = -1_i64;
         let column_statistics = self
             .column_stats
             .iter()
             .map(|s| s.into())
             .collect::<Vec<_>>();
         Ok(Statistics {
-            num_rows: Some(self.num_rows as usize),
-            total_byte_size: Some(self.total_byte_size as usize),
+            num_rows: if self.num_rows == none_value {
+                None
+            } else {
+                Some(self.num_rows as usize)
+            },
+            total_byte_size: if self.total_byte_size == none_value {
+                None
+            } else {
+                Some(self.total_byte_size as usize)
+            },
             // No column statistic (None) is encoded with empty array
             column_statistics: if column_statistics.is_empty() {
                 None