You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/02 15:15:42 UTC
[arrow-datafusion] branch master updated: If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4b3eb1cc5 If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)
4b3eb1cc5 is described below
commit 4b3eb1cc592c9ee306b8f60845632a66eb7eddba
Author: AssHero <hu...@gmail.com>
AuthorDate: Thu Jun 2 23:15:36 2022 +0800
If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)
* If statistics of column Max/Min value does not exists in parquet file, set the Max/Min Accumulator to None.
* add more data types for parquet max/min test case
---
.../core/src/datasource/file_format/parquet.rs | 24 ++++-
datafusion/core/src/datasource/mod.rs | 4 +
datafusion/core/tests/sql/parquet.rs | 100 ++++++++++++++++++++-
3 files changed, 123 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 42bff573c..a86d5e545 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -161,8 +161,11 @@ fn summarize_min_max(
}
}
}
+ return;
}
}
+ max_values[i] = None;
+ min_values[i] = None;
}
ParquetStatistics::Int32(s) => {
if let DataType::Int32 = fields[i].data_type() {
@@ -189,8 +192,11 @@ fn summarize_min_max(
}
}
}
+ return;
}
}
+ max_values[i] = None;
+ min_values[i] = None;
}
ParquetStatistics::Int64(s) => {
if let DataType::Int64 = fields[i].data_type() {
@@ -217,8 +223,11 @@ fn summarize_min_max(
}
}
}
+ return;
}
}
+ max_values[i] = None;
+ min_values[i] = None;
}
ParquetStatistics::Float(s) => {
if let DataType::Float32 = fields[i].data_type() {
@@ -243,8 +252,11 @@ fn summarize_min_max(
}
}
}
+ return;
}
}
+ max_values[i] = None;
+ min_values[i] = None;
}
ParquetStatistics::Double(s) => {
if let DataType::Float64 = fields[i].data_type() {
@@ -269,10 +281,16 @@ fn summarize_min_max(
}
}
}
+ return;
}
}
+ max_values[i] = None;
+ min_values[i] = None;
+ }
+ _ => {
+ max_values[i] = None;
+ min_values[i] = None;
}
- _ => {}
}
}
@@ -344,6 +362,10 @@ fn fetch_statistics(
table_idx,
stats,
)
+ } else {
+ // If none statistics of current column exists, set the Max/Min Accumulator to None.
+ max_values[table_idx] = None;
+ min_values[table_idx] = None;
}
} else {
*null_cnt += num_rows as usize;
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 65fc2adcb..8cb40cd27 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -78,6 +78,8 @@ pub async fn get_statistics_with_limit(
max_values[i] = None;
}
}
+ } else {
+ max_values[i] = None;
}
}
@@ -89,6 +91,8 @@ pub async fn get_statistics_with_limit(
min_values[i] = None;
}
}
+ } else {
+ min_values[i] = None;
}
}
}
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 7c1bb7c43..2fef0bfe9 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -219,13 +219,105 @@ async fn schema_merge_ignores_metadata() {
// (no errors)
let ctx = SessionContext::new();
let df = ctx
- .read_parquet(
- table_dir.to_str().unwrap().to_string(),
- ParquetReadOptions::default(),
- )
+ .read_parquet(table_dir.to_str().unwrap(), ParquetReadOptions::default())
.await
.unwrap();
let result = df.collect().await.unwrap();
assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}
+
+#[tokio::test]
+async fn parquet_query_with_max_min() {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+ let table_path = Path::new(&table_dir);
+
+ let fields = vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Utf8, true),
+ Field::new("c3", DataType::Int64, true),
+ Field::new("c4", DataType::Date32, true),
+ ];
+
+ let schema = Arc::new(Schema::new(fields.clone()));
+
+ if let Ok(()) = fs::create_dir(table_path) {
+ let filename = "foo.parquet";
+ let path = table_path.join(&filename);
+ let file = fs::File::create(path).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
+ .unwrap();
+
+ // create mock record batch
+ let c1s = Arc::new(Int32Array::from_slice(&[1, 2, 3]));
+ let c2s = Arc::new(StringArray::from_slice(&["aaa", "bbb", "ccc"]));
+ let c3s = Arc::new(Int64Array::from_slice(&[100, 200, 300]));
+ let c4s = Arc::new(Date32Array::from(vec![Some(1), Some(2), Some(3)]));
+ let rec_batch =
+ RecordBatch::try_new(schema.clone(), vec![c1s, c2s, c3s, c4s]).unwrap();
+
+ writer.write(&rec_batch).unwrap();
+ writer.close().unwrap();
+ }
+
+ // query parquet
+ let ctx = SessionContext::new();
+
+ ctx.register_parquet(
+ "foo",
+ &format!("{}/foo.parquet", table_dir.to_str().unwrap()),
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+
+ let sql = "SELECT max(c1) FROM foo";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------------+",
+ "| MAX(foo.c1) |",
+ "+-------------+",
+ "| 3 |",
+ "+-------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT min(c2) FROM foo";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------------+",
+ "| MIN(foo.c2) |",
+ "+-------------+",
+ "| aaa |",
+ "+-------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT max(c3) FROM foo";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------------+",
+ "| MAX(foo.c3) |",
+ "+-------------+",
+ "| 300 |",
+ "+-------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT min(c4) FROM foo";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------------+",
+ "| MIN(foo.c4) |",
+ "+-------------+",
+ "| 1970-01-02 |",
+ "+-------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+}