You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/02 15:15:42 UTC

[arrow-datafusion] branch master updated: If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3eb1cc5 If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)
4b3eb1cc5 is described below

commit 4b3eb1cc592c9ee306b8f60845632a66eb7eddba
Author: AssHero <hu...@gmail.com>
AuthorDate: Thu Jun 2 23:15:36 2022 +0800

    If statistics of column Max/Min value does not exists in parquet file, sent Min/Max to None (#2671)
    
    * If statistics of column Max/Min value does not exists in parquet file, set the Max/Min Accumulator to None.
    
    * add more data types for parquet max/min test case
---
 .../core/src/datasource/file_format/parquet.rs     |  24 ++++-
 datafusion/core/src/datasource/mod.rs              |   4 +
 datafusion/core/tests/sql/parquet.rs               | 100 ++++++++++++++++++++-
 3 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 42bff573c..a86d5e545 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -161,8 +161,11 @@ fn summarize_min_max(
                             }
                         }
                     }
+                    return;
                 }
             }
+            max_values[i] = None;
+            min_values[i] = None;
         }
         ParquetStatistics::Int32(s) => {
             if let DataType::Int32 = fields[i].data_type() {
@@ -189,8 +192,11 @@ fn summarize_min_max(
                             }
                         }
                     }
+                    return;
                 }
             }
+            max_values[i] = None;
+            min_values[i] = None;
         }
         ParquetStatistics::Int64(s) => {
             if let DataType::Int64 = fields[i].data_type() {
@@ -217,8 +223,11 @@ fn summarize_min_max(
                             }
                         }
                     }
+                    return;
                 }
             }
+            max_values[i] = None;
+            min_values[i] = None;
         }
         ParquetStatistics::Float(s) => {
             if let DataType::Float32 = fields[i].data_type() {
@@ -243,8 +252,11 @@ fn summarize_min_max(
                             }
                         }
                     }
+                    return;
                 }
             }
+            max_values[i] = None;
+            min_values[i] = None;
         }
         ParquetStatistics::Double(s) => {
             if let DataType::Float64 = fields[i].data_type() {
@@ -269,10 +281,16 @@ fn summarize_min_max(
                             }
                         }
                     }
+                    return;
                 }
             }
+            max_values[i] = None;
+            min_values[i] = None;
+        }
+        _ => {
+            max_values[i] = None;
+            min_values[i] = None;
         }
-        _ => {}
     }
 }
 
@@ -344,6 +362,10 @@ fn fetch_statistics(
                             table_idx,
                             stats,
                         )
+                    } else {
+                        // If none statistics of current column exists, set the Max/Min Accumulator to None.
+                        max_values[table_idx] = None;
+                        min_values[table_idx] = None;
                     }
                 } else {
                     *null_cnt += num_rows as usize;
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 65fc2adcb..8cb40cd27 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -78,6 +78,8 @@ pub async fn get_statistics_with_limit(
                                 max_values[i] = None;
                             }
                         }
+                    } else {
+                        max_values[i] = None;
                     }
                 }
 
@@ -89,6 +91,8 @@ pub async fn get_statistics_with_limit(
                                 min_values[i] = None;
                             }
                         }
+                    } else {
+                        min_values[i] = None;
                     }
                 }
             }
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 7c1bb7c43..2fef0bfe9 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -219,13 +219,105 @@ async fn schema_merge_ignores_metadata() {
     // (no errors)
     let ctx = SessionContext::new();
     let df = ctx
-        .read_parquet(
-            table_dir.to_str().unwrap().to_string(),
-            ParquetReadOptions::default(),
-        )
+        .read_parquet(table_dir.to_str().unwrap(), ParquetReadOptions::default())
         .await
         .unwrap();
     let result = df.collect().await.unwrap();
 
     assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
 }
+
+#[tokio::test]
+async fn parquet_query_with_max_min() {
+    let tmp_dir = TempDir::new().unwrap();
+    let table_dir = tmp_dir.path().join("parquet_test");
+    let table_path = Path::new(&table_dir);
+
+    let fields = vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Utf8, true),
+        Field::new("c3", DataType::Int64, true),
+        Field::new("c4", DataType::Date32, true),
+    ];
+
+    let schema = Arc::new(Schema::new(fields.clone()));
+
+    if let Ok(()) = fs::create_dir(table_path) {
+        let filename = "foo.parquet";
+        let path = table_path.join(&filename);
+        let file = fs::File::create(path).unwrap();
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
+                .unwrap();
+
+        // create mock record batch
+        let c1s = Arc::new(Int32Array::from_slice(&[1, 2, 3]));
+        let c2s = Arc::new(StringArray::from_slice(&["aaa", "bbb", "ccc"]));
+        let c3s = Arc::new(Int64Array::from_slice(&[100, 200, 300]));
+        let c4s = Arc::new(Date32Array::from(vec![Some(1), Some(2), Some(3)]));
+        let rec_batch =
+            RecordBatch::try_new(schema.clone(), vec![c1s, c2s, c3s, c4s]).unwrap();
+
+        writer.write(&rec_batch).unwrap();
+        writer.close().unwrap();
+    }
+
+    // query parquet
+    let ctx = SessionContext::new();
+
+    ctx.register_parquet(
+        "foo",
+        &format!("{}/foo.parquet", table_dir.to_str().unwrap()),
+        ParquetReadOptions::default(),
+    )
+    .await
+    .unwrap();
+
+    let sql = "SELECT max(c1) FROM foo";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------------+",
+        "| MAX(foo.c1) |",
+        "+-------------+",
+        "| 3           |",
+        "+-------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT min(c2) FROM foo";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------------+",
+        "| MIN(foo.c2) |",
+        "+-------------+",
+        "| aaa         |",
+        "+-------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT max(c3) FROM foo";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------------+",
+        "| MAX(foo.c3) |",
+        "+-------------+",
+        "| 300         |",
+        "+-------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT min(c4) FROM foo";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------------+",
+        "| MIN(foo.c4) |",
+        "+-------------+",
+        "| 1970-01-02  |",
+        "+-------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}