You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/01/25 19:56:00 UTC

[arrow-datafusion] branch master updated: Fix limit when size of batch to poll == skip/fetch value (#5066)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5238e8c97 Fix limit when size of batch to poll == skip/fetch value (#5066)
5238e8c97 is described below

commit 5238e8c97f998b4d2cb9fab85fb182f325a1a7fb
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Wed Jan 25 20:55:54 2023 +0100

    Fix limit when size of batch to poll == skip/fetch value (#5066)
    
    * Fix limit when size of batch to poll == batch size
    
    * recover test
    
    * recover test
    
    * Fix comment
    
    * Don't try fix skip for now
    
    Co-authored-by: Daniël Heres <da...@coralogix.com>
---
 datafusion/core/src/physical_plan/limit.rs | 33 +++++++++++++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 3fa900ca1..1b97089f7 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -449,7 +449,8 @@ impl LimitStream {
         if self.fetch == 0 {
             self.input = None; // clear input so it can be dropped early
             None
-        } else if batch.num_rows() <= self.fetch {
+        } else if batch.num_rows() < self.fetch {
+            //
             self.fetch -= batch.num_rows();
             Some(batch)
         } else {
@@ -571,6 +572,36 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn limit_equals_batch_size() -> Result<()> {
+        let batches = vec![
+            test::make_partition(6),
+            test::make_partition(6),
+            test::make_partition(6),
+        ];
+        let input = test::exec::TestStream::new(batches);
+
+        let index = input.index();
+        assert_eq!(index.value(), 0);
+
+        // limit of six needs to consume the entire first record batch
+        // (6 rows) and stop immediately
+        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let limit_stream =
+            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
+        assert_eq!(index.value(), 0);
+
+        let results = collect(Box::pin(limit_stream)).await.unwrap();
+        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+        // Only 6 rows should have been produced
+        assert_eq!(num_rows, 6);
+
+        // Only the first batch should be consumed
+        assert_eq!(index.value(), 1);
+
+        Ok(())
+    }
+
     // test cases for "skip"
     async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
         let session_ctx = SessionContext::new();