You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/31 21:58:49 UTC
[arrow-datafusion] branch master updated: Bug fix: Empty Record Batch handling (#5131)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d59b6dd56 Bug fix: Empty Record Batch handling (#5131)
d59b6dd56 is described below
commit d59b6dd563e3a903fae62606371e1b6f3eda53dc
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Feb 1 00:58:43 2023 +0300
Bug fix: Empty Record Batch handling (#5131)
* Add empty batch check
* add new test
* formatting fix
---------
Co-authored-by: Mustafa Akur <ak...@gmail.com>
---
.../src/physical_plan/windows/window_agg_exec.rs | 6 ++---
datafusion/core/tests/sql/window.rs | 29 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index c1f708726..a667f0a3c 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -329,13 +329,11 @@ impl WindowAggStream {
fn compute_aggregates(&self) -> Result<RecordBatch> {
// record compute time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
-
- if self.batches.is_empty() {
+ let batch = concat_batches(&self.input.schema(), &self.batches)?;
+ if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
- let batch = concat_batches(&self.input.schema(), &self.batches)?;
-
let partition_by_sort_keys = self
.partition_by_sort_keys
.iter()
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 08d6f1b32..da682abb8 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -2385,6 +2385,35 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
Ok(())
}
+#[tokio::test]
+async fn test_window_agg_low_cardinality() -> Result<()> {
+ let config = SessionConfig::new().with_target_partitions(32);
+ let ctx = SessionContext::with_config(config);
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT
+ SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as summation1,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation2
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5";
+
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+------------+-------------+",
+ "| summation1 | summation2 |",
+ "+------------+-------------+",
+ "| -16110 | 61035129 |",
+ "| 3917 | -108973366 |",
+ "| -16974 | 623103518 |",
+ "| -1114 | -1927628110 |",
+ "| 15673 | -1899175111 |",
+ "+------------+-------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ Ok(())
+}
+
fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
let ts_field = Field::new("ts", DataType::Int32, false);
let inc_field = Field::new("inc_col", DataType::Int32, false);