You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/31 21:58:49 UTC

[arrow-datafusion] branch master updated: Bug fix: Empty Record Batch handling (#5131)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d59b6dd56 Bug fix: Empty Record Batch handling (#5131)
d59b6dd56 is described below

commit d59b6dd563e3a903fae62606371e1b6f3eda53dc
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Feb 1 00:58:43 2023 +0300

    Bug fix: Empty Record Batch handling (#5131)
    
    * Add empty batch check
    
    * add new test
    
    * formatting fix
    
    ---------
    
    Co-authored-by: Mustafa Akur <ak...@gmail.com>
---
 .../src/physical_plan/windows/window_agg_exec.rs   |  6 ++---
 datafusion/core/tests/sql/window.rs                | 29 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index c1f708726..a667f0a3c 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -329,13 +329,11 @@ impl WindowAggStream {
     fn compute_aggregates(&self) -> Result<RecordBatch> {
         // record compute time on drop
         let _timer = self.baseline_metrics.elapsed_compute().timer();
-
-        if self.batches.is_empty() {
+        let batch = concat_batches(&self.input.schema(), &self.batches)?;
+        if batch.num_rows() == 0 {
             return Ok(RecordBatch::new_empty(self.schema.clone()));
         }
 
-        let batch = concat_batches(&self.input.schema(), &self.batches)?;
-
         let partition_by_sort_keys = self
             .partition_by_sort_keys
             .iter()
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 08d6f1b32..da682abb8 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -2385,6 +2385,35 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_low_cardinality() -> Result<()> {
+    let config = SessionConfig::new().with_target_partitions(32);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as summation1,
+        SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5";
+
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+------------+-------------+",
+        "| summation1 | summation2  |",
+        "+------------+-------------+",
+        "| -16110     | 61035129    |",
+        "| 3917       | -108973366  |",
+        "| -16974     | 623103518   |",
+        "| -1114      | -1927628110 |",
+        "| 15673      | -1899175111 |",
+        "+------------+-------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    Ok(())
+}
+
 fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
     let ts_field = Field::new("ts", DataType::Int32, false);
     let inc_field = Field::new("inc_col", DataType::Int32, false);