You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "yyy1000 (via GitHub)" <gi...@apache.org> on 2024/03/05 17:07:10 UTC

[PR] Implement spilling for PartialSortExec [arrow-datafusion]

yyy1000 opened a new pull request, #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469

   ## Which issue does this PR close?
   
   Hopefully it will
   Closes #9153 #9170.
   
   ## Rationale for this change
   
   See the issues
   
   ## What changes are included in this PR?
   
   This refactor wants to use `ExternalSorter` to implement `PartialSort`.
   `ExternalSorter` has implement `spilling`
   
   ## Are these changes tested?
   
   Yes, a new test case for spilling
   
   ## Are there any user-facing changes?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1514490681


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   Our original aim in the `PartialSortExec` was to produce streaming results when existing ordering and required ordering has a common prefix ordering. If that is the case, `PartialSortExec` can produce result chunk by chunk (e.g not as a single bulk). In this code snippet data is fed to chunk by chunk. However, `.sort` is called once, at the end after `while` loop ends. For this reason, data is produced still produced as a single bulk.
   
   I think this is the reason why existing test fails in this implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1513195857


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -563,7 +507,7 @@ mod tests {
                 "| 1 | 3 | 0 |",
                 "+---+---+---+",
             ];
-            assert_eq!(2, result.len());
+            assert_eq!(1, result.len());

Review Comment:
   I think the sort in ExternalSort will merge the two `RecordBatch` to one if no spilling?



##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -510,6 +398,62 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_partial_sort_spill() -> Result<()> {

Review Comment:
   This is the new test case for spill, I mainly copy from sort.rs



##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -647,157 +591,6 @@ mod tests {
         Ok(())
     }
 
-    fn prepare_partitioned_input() -> Arc<dyn ExecutionPlan> {

Review Comment:
   I deleted these test case for the same reason, the sort in ExternalSort will merge the RecordBatchs so I think these test cases are not necessary? 



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -309,6 +310,66 @@ impl ExternalSorter {
         Ok(())
     }
 
+    fn get_slice_point(
+        &self,
+        common_prefix_len: usize,
+        batch: &RecordBatch,
+    ) -> Result<Option<usize>> {
+        let common_prefix_sort_keys = (0..common_prefix_len)
+            .map(|idx| self.expr[idx].evaluate_to_sort_column(batch))
+            .collect::<Result<Vec<_>>>()?;
+        let partition_points =
+            evaluate_partition_ranges(batch.num_rows(), &common_prefix_sort_keys)?;
+        // If partition points are [0..100], [100..200], [200..300]
+        // we should return 200, which is the safest and furthest partition boundary
+        // Please note that we shouldn't return 300 (which is number of rows in the batch),
+        // because this boundary may change with new data.
+        if partition_points.len() >= 2 {
+            Ok(Some(partition_points[partition_points.len() - 2].end))
+        } else {
+            Ok(None)
+        }
+    }
+    pub(crate) async fn insert_batch_with_prefix(
+        &mut self,
+        input: RecordBatch,
+        prefix: usize,
+    ) -> Result<()> {
+        if input.num_rows() == 0 {
+            return Ok(());
+        }
+        self.reserve_memory_for_merge()?;
+
+        let size = input.get_array_memory_size();
+        if self.reservation.try_grow(size).is_err() {
+            let before = self.reservation.size();
+            self.in_mem_sort().await?;
+            // Sorting may have freed memory, especially if fetch is `Some`
+            //
+            // As such we check again, and if the memory usage has dropped by
+            // a factor of 2, and we can allocate the necessary capacity,
+            // we don't spill
+            //
+            // The factor of 2 aims to avoid a degenerate case where the
+            // memory required for `fetch` is just under the memory available,
+            // causing repeated re-sorting of data
+            if self.reservation.size() > before / 2
+                || self.reservation.try_grow(size).is_err()
+            {
+                self.spill().await?;
+                self.reservation.try_grow(size)?
+            }
+        }
+        if let Some(slice_point) = self.get_slice_point(prefix, &input)? {
+            self.in_mem_batches.push(input.slice(0, slice_point));
+            self.in_mem_batches
+                .push(input.slice(slice_point, input.num_rows() - slice_point));
+        } else {
+            self.in_mem_batches.push(input);
+        }
+        self.in_mem_batches_sorted = false;
+        Ok(())
+    }

Review Comment:
   Here it also first `get_slice_point` and then push them in `in_mem_batches`.
   A question I have is, in the past, the 2nd RecordBatch will concat to the remaining of the 1st RecordBatch, but here I didn't implement that. 🤔 Don't know whether it's necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#issuecomment-1987731498

   > Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?
   > 
   > I agree it can happen in theory, but I was wondering if anyone has seen it happening in the real world
   
   I also think, it is quite unlikely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1514744904


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   Got it. Thanks!
   Do you have some suggestions? I think maybe calling `sort_in_mem_batches` in the original implementation after `insert_batch_with_prefix` each time can solve this, and add the spilling function to `sort_in_mem_batches`. 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#issuecomment-1987307145

   > Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?
   
   No, I also don't have one. I just test the spilling function using the test from Sort. 👀
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1514791456


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   Also I have a question is `sort` returns a `SendableRecordBatchStream` and produce bulk, and `PartialSort`  returns chuck by chunk. What're the pros of `chuck by chunk`? I think they do the same for memory usage. 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#issuecomment-1986805564

   Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?
   
   I agree it can happen in theory, but I was wondering if anyone has seen it happening in the real world


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#issuecomment-1979279090

   The CI fails because PartialSort will replace Sort in some test cases like below, I can change that if others in this PR looks good.
   ```
   expected:
   
   [
       "SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
       "  SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
       "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
       "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], has_header=false",
   ]
   actual:
   
   [
       "SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
       "  PartialSortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC], common_prefix_length=[2]",
       "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
       "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], has_header=false",
   ]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "yyy1000 (via GitHub)" <gi...@apache.org>.
yyy1000 commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1516348574


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   Yeah, thanks for your info.
   I think in this PR, PartialSort can also reduce memory usage. It also emits data chunk by chunk but return a merge stream at last. Let's see what others view this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1516052258


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   > Got it. Thanks! Do you have some suggestions? I think maybe calling `sort_in_mem_batches` in the original implementation after `insert_batch_with_prefix` each time can solve this, and add the spilling function to `sort_in_mem_batches`. 🤔
   
   Not really sure. How to implement it. Unfortunately, I am not super familiar with `ExternalSort` code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1516050811


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -282,16 +276,31 @@ impl ExecutionPlan for PartialSortExec {
         // Make sure common prefix length is larger than 0
         // Otherwise, we should use SortExec.
         assert!(self.common_prefix_length > 0);
+        let execution_options = &context.session_config().options().execution;
 
-        Ok(Box::pin(PartialSortStream {
-            input,
-            expr: self.expr.clone(),
-            common_prefix_length: self.common_prefix_length,
-            in_mem_batches: vec![],
-            fetch: self.fetch,
-            is_closed: false,
-            baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
-        }))
+        let mut sorter = crate::sorts::sort::ExternalSorter::new(
+            partition,
+            input.schema(),
+            self.expr.clone(),
+            context.session_config().batch_size(),
+            self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
+            &self.metrics_set,
+            context.runtime_env(),
+        );
+        let prefix = self.common_prefix_length;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(async move {
+                while let Some(batch) = input.next().await {
+                    let batch = batch?;
+                    sorter.insert_batch_with_prefix(batch, prefix).await?;
+                }
+                sorter.sort()

Review Comment:
   Actually memory usage of the PartialSort is less than the classic Sort. Since it consumes data chunk by chunk then emits them chunk by chunk. It can prune data from its buffer when common prefix changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement spilling for PartialSortExec [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#issuecomment-2056610159

   Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org