You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/10 09:27:08 UTC

[arrow-datafusion] branch master updated: Fix date32 and date64 parquet row group pruning, tests for same (#690)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d24567  Fix date32 and date64 parquet row group pruning, tests for same (#690)
7d24567 is described below

commit 7d2456743470142753d940f1829db5258e417c27
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 10 05:27:01 2021 -0400

    Fix date32 and date64 parquet row group pruning, tests for same (#690)
---
 datafusion/src/physical_plan/expressions/binary.rs |   3 +
 datafusion/src/scalar.rs                           |   3 +
 datafusion/tests/parquet_pruning.rs                | 210 ++++++++++++++++++---
 3 files changed, 193 insertions(+), 23 deletions(-)

diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs
index 102b701..3394113 100644
--- a/datafusion/src/physical_plan/expressions/binary.rs
+++ b/datafusion/src/physical_plan/expressions/binary.rs
@@ -269,6 +269,9 @@ macro_rules! binary_array_op_scalar {
             DataType::Date32 => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
             }
+            DataType::Date64 => {
+                compute_op_scalar!($LEFT, $RIGHT, $OP, Date64Array)
+            }
             other => Err(DataFusionError::Internal(format!(
                 "Data type {:?} not supported for scalar operation on dyn array",
                 other
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index c23674b..f94a090 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -900,6 +900,7 @@ impl TryFrom<ScalarValue> for i64 {
     fn try_from(value: ScalarValue) -> Result<Self> {
         match value {
             ScalarValue::Int64(Some(inner_value))
+            | ScalarValue::Date64(Some(inner_value))
             | ScalarValue::TimestampNanosecond(Some(inner_value))
             | ScalarValue::TimestampMicrosecond(Some(inner_value))
             | ScalarValue::TimestampMillisecond(Some(inner_value))
@@ -939,6 +940,8 @@ impl TryFrom<&DataType> for ScalarValue {
             DataType::UInt64 => ScalarValue::UInt64(None),
             DataType::Utf8 => ScalarValue::Utf8(None),
             DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
+            DataType::Date32 => ScalarValue::Date32(None),
+            DataType::Date64 => ScalarValue::Date64(None),
             DataType::Timestamp(TimeUnit::Second, _) => {
                 ScalarValue::TimestampSecond(None)
             }
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index f5486af..8ad7974 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -21,17 +21,20 @@ use std::sync::Arc;
 
 use arrow::{
     array::{
-        Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
-        TimestampNanosecondArray, TimestampSecondArray,
+        Array, Date32Array, Date64Array, StringArray, TimestampMicrosecondArray,
+        TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
     },
     datatypes::{Field, Schema},
     record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
-use chrono::Duration;
+use chrono::{Datelike, Duration};
 use datafusion::{
+    datasource::{parquet::ParquetTable, TableProvider},
+    logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder},
     physical_plan::{plan_metrics, SQLMetric},
     prelude::ExecutionContext,
+    scalar::ScalarValue,
 };
 use hashbrown::HashMap;
 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
@@ -39,7 +42,7 @@ use tempfile::NamedTempFile;
 
 #[tokio::test]
 async fn prune_timestamps_nanos() {
-    let output = ContextWithParquet::new()
+    let output = ContextWithParquet::new(Scenario::Timestamps)
         .await
         .query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')")
         .await;
@@ -52,7 +55,7 @@ async fn prune_timestamps_nanos() {
 
 #[tokio::test]
 async fn prune_timestamps_micros() {
-    let output = ContextWithParquet::new()
+    let output = ContextWithParquet::new(Scenario::Timestamps)
         .await
         .query(
             "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')",
@@ -67,7 +70,7 @@ async fn prune_timestamps_micros() {
 
 #[tokio::test]
 async fn prune_timestamps_millis() {
-    let output = ContextWithParquet::new()
+    let output = ContextWithParquet::new(Scenario::Timestamps)
         .await
         .query(
             "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')",
@@ -82,7 +85,7 @@ async fn prune_timestamps_millis() {
 
 #[tokio::test]
 async fn prune_timestamps_seconds() {
-    let output = ContextWithParquet::new()
+    let output = ContextWithParquet::new(Scenario::Timestamps)
         .await
         .query(
             "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')",
@@ -95,15 +98,60 @@ async fn prune_timestamps_seconds() {
     assert_eq!(output.result_rows, 10, "{}", output.description());
 }
 
+#[tokio::test]
+async fn prune_date32() {
+    let output = ContextWithParquet::new(Scenario::Dates)
+        .await
+        .query("SELECT * FROM t where date32 < cast('2020-01-02' as date)")
+        .await;
+    println!("{}", output.description());
+    // This should prune out groups  without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(3));
+    assert_eq!(output.result_rows, 1, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_date64() {
+    // work around for not being able to cast Date32 to Date64 automatically
+    let date = "2020-01-02"
+        .parse::<chrono::NaiveDate>()
+        .unwrap()
+        .and_time(chrono::NaiveTime::from_hms(0, 0, 0));
+    let date = ScalarValue::Date64(Some(date.timestamp_millis()));
+
+    let output = ContextWithParquet::new(Scenario::Dates)
+        .await
+        .query_with_expr(col("date64").lt(lit(date)))
+        // .query(
+        //     "SELECT * FROM t where date64 < caste('2020-01-02' as date)",
+        // query results in Plan("'Date64 < Date32' can't be evaluated because there isn't a common type to coerce the types to")
+        // )
+        .await;
+
+    println!("{}", output.description());
+    // This should prune out groups  without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(3));
+    assert_eq!(output.result_rows, 1, "{}", output.description());
+}
+
 // ----------------------
 // Begin test fixture
 // ----------------------
 
+/// What data to use
+enum Scenario {
+    Timestamps,
+    Dates,
+}
+
 /// Test fixture that has an execution context that has an external
 /// table "t" registered, pointing at a parquet file made with
 /// `make_test_file`
 struct ContextWithParquet {
     file: NamedTempFile,
+    provider: Arc<dyn TableProvider>,
     ctx: ExecutionContext,
 }
 
@@ -156,24 +204,54 @@ impl TestOutput {
 
 /// Creates an execution context that has an external table "t"
 /// registered pointing at a parquet file made with `make_test_file`
+/// and the appropriate scenario
 impl ContextWithParquet {
-    async fn new() -> Self {
-        let file = make_test_file().await;
+    async fn new(scenario: Scenario) -> Self {
+        let file = make_test_file(scenario).await;
 
         // now, setup a the file as a data source and run a query against it
         let mut ctx = ExecutionContext::new();
         let parquet_path = file.path().to_string_lossy();
-        ctx.register_parquet("t", &parquet_path)
-            .expect("registering");
 
-        Self { file, ctx }
+        let table = ParquetTable::try_new(parquet_path, 4).unwrap();
+
+        let provider = Arc::new(table);
+        ctx.register_table("t", provider.clone()).unwrap();
+
+        Self {
+            file,
+            provider,
+            ctx,
+        }
+    }
+
+    /// runs a query like "SELECT * from t WHERE <expr> and returns
+    /// the number of output rows and normalized execution metrics
+    async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
+        let sql = format!("EXPR only: {:?}", expr);
+        let logical_plan = LogicalPlanBuilder::scan("t", self.provider.clone(), None)
+            .unwrap()
+            .filter(expr)
+            .unwrap()
+            .build()
+            .unwrap();
+        self.run_test(logical_plan, sql).await
     }
 
     /// Runs the specified SQL query and returns the number of output
     /// rows and normalized execution metrics
     async fn query(&mut self, sql: &str) -> TestOutput {
         println!("Planning sql {}", sql);
+        let logical_plan = self.ctx.sql(sql).expect("planning").to_logical_plan();
+        self.run_test(logical_plan, sql).await
+    }
 
+    /// runs the logical plan
+    async fn run_test(
+        &mut self,
+        logical_plan: LogicalPlan,
+        sql: impl Into<String>,
+    ) -> TestOutput {
         let input = self
             .ctx
             .sql("SELECT * from t")
@@ -183,8 +261,6 @@ impl ContextWithParquet {
             .expect("getting input");
         let pretty_input = pretty_format_batches(&input).unwrap();
 
-        let logical_plan = self.ctx.sql(sql).expect("planning").to_logical_plan();
-
         let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
         let execution_plan = self
             .ctx
@@ -210,7 +286,7 @@ impl ContextWithParquet {
 
         let pretty_results = pretty_format_batches(&results).unwrap();
 
-        let sql = sql.to_string();
+        let sql = sql.into();
         TestOutput {
             sql,
             metrics,
@@ -222,7 +298,7 @@ impl ContextWithParquet {
 }
 
 /// Create a test parquet file with varioud data types
-async fn make_test_file() -> NamedTempFile {
+async fn make_test_file(scenario: Scenario) -> NamedTempFile {
     let output_file = tempfile::Builder::new()
         .prefix("parquet_pruning")
         .suffix(".parquet")
@@ -233,12 +309,25 @@ async fn make_test_file() -> NamedTempFile {
         .set_max_row_group_size(5)
         .build();
 
-    let batches = vec![
-        make_batch(Duration::seconds(0)),
-        make_batch(Duration::seconds(10)),
-        make_batch(Duration::minutes(10)),
-        make_batch(Duration::days(10)),
-    ];
+    let batches = match scenario {
+        Scenario::Timestamps => {
+            vec![
+                make_timestamp_batch(Duration::seconds(0)),
+                make_timestamp_batch(Duration::seconds(10)),
+                make_timestamp_batch(Duration::minutes(10)),
+                make_timestamp_batch(Duration::days(10)),
+            ]
+        }
+        Scenario::Dates => {
+            vec![
+                make_date_batch(Duration::days(0)),
+                make_date_batch(Duration::days(10)),
+                make_date_batch(Duration::days(300)),
+                make_date_batch(Duration::days(3600)),
+            ]
+        }
+    };
+
     let schema = batches[0].schema();
 
     let mut writer = ArrowWriter::try_new(
@@ -268,7 +357,7 @@ async fn make_test_file() -> NamedTempFile {
 /// "millis" --> TimestampMillisecondArray
 /// "seconds" --> TimestampSecondArray
 /// "names" --> StringArray
-pub fn make_batch(offset: Duration) -> RecordBatch {
+fn make_timestamp_batch(offset: Duration) -> RecordBatch {
     let ts_strings = vec![
         Some("2020-01-01T01:01:01.0000000000001"),
         Some("2020-01-01T01:02:01.0000000000001"),
@@ -341,3 +430,78 @@ pub fn make_batch(offset: Duration) -> RecordBatch {
     )
     .unwrap()
 }
+
+/// Return record batch with a few rows of data for all of the supported date
+/// types with the specified offset (in days)
+///
+/// Columns are named:
+/// "date32" --> Date32Array
+/// "date64" --> Date64Array
+/// "names" --> StringArray
+fn make_date_batch(offset: Duration) -> RecordBatch {
+    let date_strings = vec![
+        Some("2020-01-01"),
+        Some("2020-01-02"),
+        Some("2020-01-03"),
+        None,
+        Some("2020-01-04"),
+    ];
+
+    let names = date_strings
+        .iter()
+        .enumerate()
+        .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val))
+        .collect::<Vec<_>>();
+
+    // Copied from `cast.rs` cast kernel due to lack of temporal kernels
+    // https://github.com/apache/arrow-rs/issues/527
+    const EPOCH_DAYS_FROM_CE: i32 = 719_163;
+
+    let date_seconds = date_strings
+        .iter()
+        .map(|t| {
+            t.map(|t| {
+                let t = t.parse::<chrono::NaiveDate>().unwrap();
+                let t = t + offset;
+                t.num_days_from_ce() - EPOCH_DAYS_FROM_CE
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let date_millis = date_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                let t = t
+                    .parse::<chrono::NaiveDate>()
+                    .unwrap()
+                    .and_time(chrono::NaiveTime::from_hms(0, 0, 0));
+                let t = t + offset;
+                t.timestamp_millis()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let arr_date32 = Date32Array::from(date_seconds);
+    let arr_date64 = Date64Array::from(date_millis);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("date32", arr_date32.data_type().clone(), true),
+        Field::new("date64", arr_date64.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_date32),
+            Arc::new(arr_date64),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}