You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/01/25 19:56:00 UTC
[arrow-datafusion] branch master updated: Fix limit when size of batch to poll == skip/fetch value (#5066)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5238e8c97 Fix limit when size of batch to poll == skip/fetch value (#5066)
5238e8c97 is described below
commit 5238e8c97f998b4d2cb9fab85fb182f325a1a7fb
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Wed Jan 25 20:55:54 2023 +0100
Fix limit when size of batch to poll == skip/fetch value (#5066)
* Fix limit when size of batch to poll == batch size
* recover test
* recover test
* Fix comment
* Don't try fix skip for now
Co-authored-by: Daniël Heres <da...@coralogix.com>
---
datafusion/core/src/physical_plan/limit.rs | 33 +++++++++++++++++++++++++++++-
1 file changed, 32 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 3fa900ca1..1b97089f7 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -449,7 +449,8 @@ impl LimitStream {
if self.fetch == 0 {
self.input = None; // clear input so it can be dropped early
None
- } else if batch.num_rows() <= self.fetch {
+ } else if batch.num_rows() < self.fetch {
+ //
self.fetch -= batch.num_rows();
Some(batch)
} else {
@@ -571,6 +572,36 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn limit_equals_batch_size() -> Result<()> {
+ let batches = vec![
+ test::make_partition(6),
+ test::make_partition(6),
+ test::make_partition(6),
+ ];
+ let input = test::exec::TestStream::new(batches);
+
+ let index = input.index();
+ assert_eq!(index.value(), 0);
+
+ // limit of six needs to consume the entire first record batch
+ // (6 rows) and stop immediately
+ let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+ let limit_stream =
+ LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
+ assert_eq!(index.value(), 0);
+
+ let results = collect(Box::pin(limit_stream)).await.unwrap();
+ let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+ // Only 6 rows should have been produced
+ assert_eq!(num_rows, 6);
+
+ // Only the first batch should be consumed
+ assert_eq!(index.value(), 1);
+
+ Ok(())
+ }
+
// test cases for "skip"
async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
let session_ctx = SessionContext::new();