You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/17 11:07:55 UTC
[arrow-datafusion] branch master updated: Fix limited statistic collection accross files with no stats (#4521)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 067d0443d Fix limited statistic collection accross files with no stats (#4521)
067d0443d is described below
commit 067d0443d2167cc33666c6490f383083a1c2b200
Author: Batuhan Taskaya <is...@gmail.com>
AuthorDate: Sat Dec 17 14:07:49 2022 +0300
Fix limited statistic collection accross files with no stats (#4521)
---
datafusion/core/src/datasource/listing/table.rs | 24 +++++++++++++++++
datafusion/core/src/datasource/mod.rs | 34 ++++++++++++++++++------
datafusion/proto/src/physical_plan/from_proto.rs | 14 ++++++++--
3 files changed, 62 insertions(+), 10 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 22fcb9216..3ea62dbd0 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -747,6 +747,30 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn load_table_stats_when_no_stats() -> Result<()> {
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+ let table_path = ListingTableUrl::parse(filename).unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+
+ let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())))
+ .with_collect_stat(false);
+ let schema = opt.infer_schema(&state, &table_path).await?;
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(opt)
+ .with_schema(schema);
+ let table = ListingTable::try_new(config)?;
+
+ let exec = table.scan(&state, None, &[], None).await?;
+ assert_eq!(exec.statistics().num_rows, None);
+ assert_eq!(exec.statistics().total_byte_size, None);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_try_create_output_ordering() {
let testdata = crate::test_util::parquet_test_data();
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 8610607b6..a0d512109 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -48,7 +48,6 @@ use futures::StreamExt;
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files.
/// Needed to read up to `limit` number of rows.
-/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
@@ -56,21 +55,35 @@ pub async fn get_statistics_with_limit(
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
- let mut total_byte_size = 0;
let mut null_counts = vec![0; file_schema.fields().len()];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
- let mut num_rows = 0;
let mut is_exact = true;
+
+ // The number of rows and the total byte size can be calculated as long as
+ // at least one file has them. If none of the files provide them, then they
+ // will be omitted from the statistics. The missing values will be counted
+ // as zero.
+ let mut num_rows = None;
+ let mut total_byte_size = None;
+
// fusing the stream allows us to call next safely even once it is finished
let mut all_files = Box::pin(all_files.fuse());
while let Some(res) = all_files.next().await {
let (file, file_stats) = res?;
result_files.push(file);
is_exact &= file_stats.is_exact;
- num_rows += file_stats.num_rows.unwrap_or(0);
- total_byte_size += file_stats.total_byte_size.unwrap_or(0);
+ num_rows = if let Some(num_rows) = num_rows {
+ Some(num_rows + file_stats.num_rows.unwrap_or(0))
+ } else {
+ file_stats.num_rows
+ };
+ total_byte_size = if let Some(total_byte_size) = total_byte_size {
+ Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
+ } else {
+ file_stats.total_byte_size
+ };
if let Some(vec) = &file_stats.column_statistics {
has_statistics = true;
for (i, cs) in vec.iter().enumerate() {
@@ -103,7 +116,12 @@ pub async fn get_statistics_with_limit(
}
}
}
- if num_rows > limit.unwrap_or(usize::MAX) {
+
+ // If the number of rows exceeds the limit, we can stop processing
+ // files. This only applies when we know the number of rows. It also
+ // currently ignores tables that have no statistics regarding the
+ // number of rows.
+ if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
break;
}
}
@@ -126,8 +144,8 @@ pub async fn get_statistics_with_limit(
};
let statistics = Statistics {
- num_rows: Some(num_rows),
- total_byte_size: Some(total_byte_size),
+ num_rows,
+ total_byte_size,
column_statistics: column_stats,
is_exact,
};
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 825399825..cd79e4206 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -362,14 +362,24 @@ impl TryInto<Statistics> for &protobuf::Statistics {
type Error = DataFusionError;
fn try_into(self) -> Result<Statistics, Self::Error> {
+ // Keep it sync with Statistics::to_proto
+ let none_value = -1_i64;
let column_statistics = self
.column_stats
.iter()
.map(|s| s.into())
.collect::<Vec<_>>();
Ok(Statistics {
- num_rows: Some(self.num_rows as usize),
- total_byte_size: Some(self.total_byte_size as usize),
+ num_rows: if self.num_rows == none_value {
+ None
+ } else {
+ Some(self.num_rows as usize)
+ },
+ total_byte_size: if self.total_byte_size == none_value {
+ None
+ } else {
+ Some(self.total_byte_size as usize)
+ },
// No column statistic (None) is encoded with empty array
column_statistics: if column_statistics.is_empty() {
None