You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 03:21:48 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4391: Clean the code in `limit.rs`.

jackwener commented on code in PR #4391:
URL: https://github.com/apache/arrow-datafusion/pull/4391#discussion_r1033087940


##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -420,47 +405,52 @@ impl LimitStream {
         loop {
             let poll = input.poll_next_unpin(cx);
             let poll = poll.map_ok(|batch| {
-                if batch.num_rows() + self.current_skipped <= self.skip {
-                    self.current_skipped += batch.num_rows();
+                if batch.num_rows() <= self.skip {
+                    self.skip -= batch.num_rows();
                     RecordBatch::new_empty(input.schema())
                 } else {
-                    let offset = self.skip - self.current_skipped;
-                    let new_batch = batch.slice(offset, batch.num_rows() - offset);
-                    self.current_skipped = self.skip;
+                    let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
+                    self.skip = 0;
                     new_batch
                 }
             });
 
             match &poll {
-                Poll::Ready(Some(Ok(batch)))
-                    if batch.num_rows() > 0 && self.current_skipped == self.skip =>
-                {
-                    break poll
+                Poll::Ready(Some(Ok(batch))) => {
+                    if batch.num_rows() > 0 && self.skip == 0 {
+                        break poll;
+                    } else {
+                        // continue to poll input stream
+                    }

Review Comment:
   more clear 👍



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -365,30 +365,17 @@ impl ExecutionPlan for LocalLimitExec {
     }
 }
 
-/// Truncate a RecordBatch to maximum of n rows
-pub fn truncate_batch(batch: &RecordBatch, n: usize) -> RecordBatch {
-    let limited_columns: Vec<ArrayRef> = (0..batch.num_columns())
-        .map(|i| limit(batch.column(i), n))
-        .collect();
-
-    RecordBatch::try_new(batch.schema(), limited_columns).unwrap()
-}
-
 /// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
 struct LimitStream {
-    /// The number of rows to skip
+    /// The remaining number of rows to skip
     skip: usize,

Review Comment:
   The meaning of`skip` and `fetch` has changed, we should rename them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org