You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/04/29 20:11:50 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #6163: Adaptive in-memory sort (~2x faster) (#5879)

tustvold opened a new pull request, #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #5879 #5230
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ```
   merge sorted i64        time:   [4.7568 ms 4.7616 ms 4.7669 ms]
                           change: [+2.1068% +2.3410% +2.5729%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 7 outliers among 100 measurements (7.00%)
     4 (4.00%) high mild
     3 (3.00%) high severe
   
   sort merge i64          time:   [4.7577 ms 4.7668 ms 4.7756 ms]
                           change: [-3.8818% -3.5709% -3.2819%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 10 outliers among 100 measurements (10.00%)
     8 (8.00%) low mild
     2 (2.00%) high mild
   
   sort i64                time:   [3.5604 ms 3.5695 ms 3.5787 ms]
                           change: [-47.559% -47.338% -47.118%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   
   sort partitioned i64    time:   [189.26 µs 191.22 µs 193.29 µs]
                           change: [-67.019% -44.594% -9.0868%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 9 outliers among 100 measurements (9.00%)
     9 (9.00%) high severe
   
   merge sorted f64        time:   [4.9416 ms 4.9477 ms 4.9549 ms]
                           change: [+1.0481% +1.2864% +1.5176%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 5 outliers among 100 measurements (5.00%)
     2 (2.00%) high mild
     3 (3.00%) high severe
   
   sort merge f64          time:   [4.9364 ms 4.9466 ms 4.9567 ms]
                           change: [-4.5491% -4.2663% -3.9838%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort f64                time:   [4.5859 ms 4.6085 ms 4.6335 ms]
                           change: [-41.877% -41.536% -41.182%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 13 outliers among 100 measurements (13.00%)
     3 (3.00%) high mild
     10 (10.00%) high severe
   
   sort partitioned f64    time:   [193.25 µs 194.84 µs 196.60 µs]
                           change: [-66.364% -49.820% -24.707%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 7 outliers among 100 measurements (7.00%)
     1 (1.00%) high mild
     6 (6.00%) high severe
   
   merge sorted utf8 low cardinality
                           time:   [5.2695 ms 5.2795 ms 5.2905 ms]
                           change: [+0.1782% +0.5459% +0.8824%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   Found 8 outliers among 100 measurements (8.00%)
     3 (3.00%) high mild
     5 (5.00%) high severe
   
   sort merge utf8 low cardinality
                           time:   [5.4749 ms 5.4859 ms 5.4968 ms]
                           change: [-9.5832% -9.1290% -8.6668%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort utf8 low cardinality
                           time:   [10.327 ms 10.352 ms 10.379 ms]
                           change: [+12.671% +13.176% +13.689%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 10 outliers among 100 measurements (10.00%)
     10 (10.00%) high mild
   
   sort partitioned utf8 low cardinality
                           time:   [380.82 µs 390.19 µs 400.01 µs]
                           change: [-58.561% -51.365% -41.516%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 4 outliers among 100 measurements (4.00%)
     1 (1.00%) high mild
     3 (3.00%) high severe
   
   merge sorted utf8 high cardinality
                           time:   [8.0970 ms 8.1371 ms 8.1811 ms]
                           change: [+1.7218% +2.4401% +3.1634%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 12 outliers among 100 measurements (12.00%)
     4 (4.00%) high mild
     8 (8.00%) high severe
   
   sort merge utf8 high cardinality
                           time:   [8.2608 ms 8.2747 ms 8.2892 ms]
                           change: [-10.764% -10.105% -9.4476%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high mild
   
   sort utf8 high cardinality
                           time:   [14.481 ms 14.601 ms 14.726 ms]
                           change: [-24.598% -23.812% -23.012%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   
   sort partitioned utf8 high cardinality
                           time:   [457.02 µs 466.43 µs 475.95 µs]
                           change: [-60.870% -50.562% -36.727%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 6 outliers among 100 measurements (6.00%)
     1 (1.00%) high mild
     5 (5.00%) high severe
   
   merge sorted utf8 tuple time:   [14.555 ms 14.572 ms 14.591 ms]
                           change: [-0.4969% -0.2681% -0.0295%] (p = 0.02 < 0.05)
                           Change within noise threshold.
   Found 5 outliers among 100 measurements (5.00%)
     4 (4.00%) high mild
     1 (1.00%) high severe
   
   sort merge utf8 tuple   time:   [16.312 ms 16.388 ms 16.464 ms]
                           change: [-8.7208% -8.0755% -7.4565%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort utf8 tuple         time:   [27.043 ms 27.188 ms 27.334 ms]
                           change: [-59.129% -58.917% -58.688%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   Benchmarking sort partitioned utf8 tuple: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.0s, enable flat sampling, or reduce sample count to 70.
   sort partitioned utf8 tuple
                           time:   [1.0555 ms 1.0934 ms 1.1338 ms]
                           change: [-56.637% -52.161% -44.365%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 3 outliers among 100 measurements (3.00%)
     2 (2.00%) high mild
     1 (1.00%) high severe
   
   merge sorted utf8 dictionary
                           time:   [5.5580 ms 5.5623 ms 5.5667 ms]
                           change: [+2.0292% +2.2143% +2.3711%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high mild
   
   sort merge utf8 dictionary
                           time:   [5.2240 ms 5.2377 ms 5.2515 ms]
                           change: [-6.0528% -5.7105% -5.3791%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   Benchmarking sort utf8 dictionary: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.3s, enable flat sampling, or reduce sample count to 50.
   sort utf8 dictionary    time:   [1.6521 ms 1.6562 ms 1.6608 ms]
                           change: [-71.444% -71.351% -71.261%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 13 outliers among 100 measurements (13.00%)
     10 (10.00%) high mild
     3 (3.00%) high severe
   
   sort partitioned utf8 dictionary
                           time:   [191.38 µs 193.07 µs 194.76 µs]
                           change: [-67.496% -48.064% -16.445%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 8 outliers among 100 measurements (8.00%)
     2 (2.00%) high mild
     6 (6.00%) high severe
   
   merge sorted utf8 dictionary tuple
                           time:   [8.3299 ms 8.3399 ms 8.3514 ms]
                           change: [+2.1649% +2.3555% +2.5497%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 4 outliers among 100 measurements (4.00%)
     3 (3.00%) high mild
     1 (1.00%) high severe
   
   sort merge utf8 dictionary tuple
                           time:   [8.3788 ms 8.4157 ms 8.4522 ms]
                           change: [-11.462% -10.581% -9.7110%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort utf8 dictionary tuple
                           time:   [18.048 ms 18.103 ms 18.159 ms]
                           change: [-33.080% -32.800% -32.502%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort partitioned utf8 dictionary tuple
                           time:   [695.23 µs 717.90 µs 741.22 µs]
                           change: [-58.425% -50.360% -40.576%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 4 outliers among 100 measurements (4.00%)
     1 (1.00%) high mild
     3 (3.00%) high severe
   
   merge sorted mixed dictionary tuple
                           time:   [13.847 ms 13.864 ms 13.883 ms]
                           change: [-0.1635% +0.0555% +0.2611%] (p = 0.60 > 0.05)
                           No change in performance detected.
   Found 3 outliers among 100 measurements (3.00%)
     2 (2.00%) high mild
     1 (1.00%) high severe
   
   sort merge mixed dictionary tuple
                           time:   [14.056 ms 14.103 ms 14.150 ms]
                           change: [-9.5911% -9.0131% -8.4159%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort mixed dictionary tuple
                           time:   [25.585 ms 25.672 ms 25.759 ms]
                           change: [-60.308% -60.150% -59.983%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort partitioned mixed dictionary tuple
                           time:   [753.92 µs 781.26 µs 811.65 µs]
                           change: [-56.610% -50.505% -43.248%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 5 outliers among 100 measurements (5.00%)
     1 (1.00%) high mild
     4 (4.00%) high severe
   
   merge sorted mixed tuple
                           time:   [13.707 ms 13.724 ms 13.744 ms]
                           change: [-0.7392% -0.5536% -0.3534%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   Found 3 outliers among 100 measurements (3.00%)
     1 (1.00%) high mild
     2 (2.00%) high severe
   
   sort merge mixed tuple  time:   [14.666 ms 14.728 ms 14.792 ms]
                           change: [-7.0426% -6.5315% -6.0608%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high mild
   
   sort mixed tuple        time:   [22.520 ms 22.655 ms 22.792 ms]
                           change: [-27.945% -27.394% -26.833%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   sort partitioned mixed tuple
                           time:   [661.24 µs 678.44 µs 696.29 µs]
                           change: [-57.005% -49.694% -40.292%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 7 outliers among 100 measurements (7.00%)
     2 (2.00%) high mild
     5 (5.00%) high severe
   ```
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   Previously ExternalSorter would sort every batch it receives, but then perform an in-memory sort by concatenating all the inputs together. 
   
   This PR modifies ExternalSorter to not presort batches, and to instead perform an in-memory sort prior to either spilling or performing the final sort. It also adds an adaptive strategy that falls back to SortPreservingMerge based on the size of the input batches.
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1182397091


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -115,47 +113,37 @@ impl ExternalSorter {
         }
     }
 
-    async fn insert_batch(
-        &mut self,
-        input: RecordBatch,
-        tracking_metrics: &MemTrackingMetrics,
-    ) -> Result<()> {
-        if input.num_rows() > 0 {
-            let size = batch_byte_size(&input);
-            if self.reservation.try_grow(size).is_err() {
+    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
+    ///
+    /// Updates memory usage metrics, and possibly triggers spilling to disk
+    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() == 0 {
+            return Ok(());
+        }
+
+        let size = batch_byte_size(&input);
+        if self.reservation.try_grow(size).is_err() {
+            let before = self.reservation.size();
+            self.in_mem_sort().await?;
+            // Sorting may have freed memory, especially if fetch is not `None`
+            //
+            // As such we check again, and if the memory usage has dropped by
+            // a factor of 2, and we can allocate the necessary capacity,
+            // we don't spill
+            //
+            // The factor of 2 aims to avoid a degenerate case where the
+            // memory required for `fetch` is just under the memory available,
+            // causing repeated resorting of data
+            if self.reservation.size() > before / 2

Review Comment:
   > needs to be called again and return successfully
   
   It is called in the loop body following the spill



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181220770


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -205,8 +205,8 @@ mod tests {
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
             "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
             "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99    | 99    | 86          | 86          | 301  | 296  | 301   | 1004  | 305  | 305  | 301  | 301  | 1001  | 1002  | 1001   | 289    |",
+            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",

Review Comment:
   I have to confess to not really understanding what is going on in this test. I _think_ the output order was just not guaranteed, but I don't really understand what is going on with these functions :sweat_smile: 
   
   Perhaps @mustafasrepo might be able to verify this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123596


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -242,28 +214,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+
+        let size: usize = self
+            .in_mem_batches
+            .iter()
+            .map(|x| x.get_array_memory_size())
+            .sum();
+
+        self.metrics.mem_used().set(size);
+        self.reservation.resize(size);
+        self.in_mem_batches_sorted = true;
+        Ok(())
+    }
+
+    /// Consumes in_mem_batches returning a sorted stream
+    fn in_mem_sort_stream(
+        &mut self,
+        metrics: MemTrackingMetrics,
+    ) -> Result<SendableRecordBatchStream> {
+        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.len() == 1 {
+            let batch = self.in_mem_batches.remove(0);
+            let stream =
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+            self.in_mem_batches.clear();
+            return Ok(stream);
+        }
+
+        // If less than 1MB of in-memory data, concatenate and sort in place
+        //
+        // This is a very rough heuristic and likely could be refined further
+        if self.reservation.size() < 1048576 {
+            // Concatenate memory batches together and sort
+            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            self.in_mem_batches.clear();
+            return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+        }
+
+        let streams = self
+            .in_mem_batches
+            .drain(..)
+            .map(|batch| {
+                let metrics = self.metrics_set.new_intermediate_tracking(
+                    self.partition_id,
+                    &self.runtime.memory_pool,
+                );
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
+            })
+            .collect::<Result<_>>()?;
+
+        // TODO: Run batch sorts concurrently (#6162)
+        // TODO: Pushdown fetch to streaming merge (#6000)

Review Comment:
   These are some ideas for further improvements, both should be relatively straightforward to implement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123498


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -165,28 +154,18 @@ impl ExternalSorter {
 
     /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
     fn sort(&mut self) -> Result<SendableRecordBatchStream> {
-        let batch_size = self.session_config.batch_size();
-
         if self.spilled_before() {
             let intermediate_metrics = self
                 .metrics_set
                 .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
-            let mut merge_metrics = self
+
+            let merge_metrics = self
                 .metrics_set
                 .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
 
             let mut streams = vec![];
             if !self.in_mem_batches.is_empty() {
-                let in_mem_stream = in_mem_partial_sort(
-                    &mut self.in_mem_batches,
-                    self.schema.clone(),
-                    &self.expr,
-                    batch_size,
-                    intermediate_metrics,
-                    self.fetch,
-                )?;
-                // TODO: More accurate, dynamic memory accounting (#5885)
-                merge_metrics.init_mem_used(self.reservation.free());

Review Comment:
   This is redundant as the mem sort stream already accounts for its memory usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181278534


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -205,8 +205,8 @@ mod tests {
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
             "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
             "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99    | 99    | 86          | 86          | 301  | 296  | 301   | 1004  | 305  | 305  | 301  | 301  | 1001  | 1002  | 1001   | 289    |",
+            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",

Review Comment:
   Thank you for looking into



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#issuecomment-1529002495

   > can effectively utilize the cache and enhance performance
   
   At least empirically with the benchmarks I have available, the cost of sorting and therefore copying all the values far outweighs any cache locality effects.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181612991


##########
datafusion/core/tests/order_spill_fuzz.rs:
##########
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
             RecordBatch::try_from_iter(vec![(
                 "x",
                 Arc::new(Int32Array::from_iter_values(
-                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                    (0..to_read).map(|_| rng.gen()),

Review Comment:
   I recommend a comment explaining this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181124214


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -242,28 +214,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+
+        let size: usize = self
+            .in_mem_batches
+            .iter()
+            .map(|x| x.get_array_memory_size())
+            .sum();
+
+        self.metrics.mem_used().set(size);
+        self.reservation.resize(size);
+        self.in_mem_batches_sorted = true;
+        Ok(())
+    }
+
+    /// Consumes in_mem_batches returning a sorted stream
+    fn in_mem_sort_stream(
+        &mut self,
+        metrics: MemTrackingMetrics,
+    ) -> Result<SendableRecordBatchStream> {
+        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.len() == 1 {
+            let batch = self.in_mem_batches.remove(0);
+            let stream =
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+            self.in_mem_batches.clear();
+            return Ok(stream);
+        }
+
+        // If less than 1MB of in-memory data, concatenate and sort in place
+        //
+        // This is a very rough heuristic and likely could be refined further
+        if self.reservation.size() < 1048576 {
+            // Concatenate memory batches together and sort
+            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            self.in_mem_batches.clear();
+            return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+        }
+
+        let streams = self

Review Comment:
   Thinking about this a bit more, we might want to group together groups of < 1MB to reduce the fan-in :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181264479


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {
-    /// Current logical position in the iterator
-    pos: usize,
-    /// Sorted composite index of where to find the rows in buffered batches
-    composite: Vec<CompositeIndex>,
-    /// Maximum batch size to produce
-    batch_size: usize,
+    Ok(RecordBatch::try_new(batch.schema(), columns)?)
 }
 
-impl SortedIterator {
-    fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
-        Self {
-            pos: 0,
-            composite,
-            batch_size,
-        }
-    }
-
-    fn memory_size(&self) -> usize {
-        std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
-    }
-}
-
-impl Iterator for SortedIterator {
-    type Item = Vec<CompositeSlice>;
-
-    /// Emit a max of `batch_size` positions each time
-    fn next(&mut self) -> Option<Self::Item> {
-        let length = self.composite.len();
-        if self.pos >= length {
-            return None;
-        }
-
-        let current_size = min(self.batch_size, length - self.pos);
-
-        // Combine adjacent indexes from the same batch to make a slice,
-        // for more efficient `extend` later.
-        let mut last_batch_idx = self.composite[self.pos].batch_idx;
-        let mut indices_in_batch = Vec::with_capacity(current_size);
-
-        let mut slices = vec![];
-        for ci in &self.composite[self.pos..self.pos + current_size] {
-            if ci.batch_idx != last_batch_idx {
-                group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-                last_batch_idx = ci.batch_idx;
-            }
-            indices_in_batch.push(ci.row_idx);
-        }
-
-        assert!(
-            !indices_in_batch.is_empty(),
-            "There should have at least one record in a sort output slice."
-        );
-        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
-        self.pos += current_size;
-        Some(slices)
-    }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
-    batch_idx: u32,
-    positions: &mut Vec<u32>,
-    output: &mut Vec<CompositeSlice>,
-) {
-    positions.sort_unstable();
-    let mut last_pos = 0;
-    let mut run_length = 0;
-    for pos in positions.iter() {
-        if run_length == 0 {
-            last_pos = *pos;
-            run_length = 1;
-        } else if *pos == last_pos + 1 {
-            run_length += 1;
-            last_pos = *pos;
-        } else {
-            output.push(CompositeSlice {
-                batch_idx,
-                start_row_idx: last_pos + 1 - run_length,
-                len: run_length as usize,
-            });
-            last_pos = *pos;
-            run_length = 1;
-        }
-    }
-    assert!(
-        run_length > 0,
-        "There should have at least one record in a sort output slice."
-    );
-    output.push(CompositeSlice {
-        batch_idx,
-        start_row_idx: last_pos + 1 - run_length,
-        len: run_length as usize,
-    });
-    positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
-    schema: SchemaRef,
+async fn spill_sorted_batches(
     batches: Vec<RecordBatch>,

Review Comment:
   just for curiosity, why do you prefer vector instead of stream here? to avoid channels overheads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold merged pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123596


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -242,28 +214,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+
+        let size: usize = self
+            .in_mem_batches
+            .iter()
+            .map(|x| x.get_array_memory_size())
+            .sum();
+
+        self.metrics.mem_used().set(size);
+        self.reservation.resize(size);
+        self.in_mem_batches_sorted = true;
+        Ok(())
+    }
+
+    /// Consumes in_mem_batches returning a sorted stream
+    fn in_mem_sort_stream(
+        &mut self,
+        metrics: MemTrackingMetrics,
+    ) -> Result<SendableRecordBatchStream> {
+        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.len() == 1 {
+            let batch = self.in_mem_batches.remove(0);
+            let stream =
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+            self.in_mem_batches.clear();
+            return Ok(stream);
+        }
+
+        // If less than 1MB of in-memory data, concatenate and sort in place
+        //
+        // This is a very rough heuristic and likely could be refined further
+        if self.reservation.size() < 1048576 {
+            // Concatenate memory batches together and sort
+            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            self.in_mem_batches.clear();
+            return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+        }
+
+        let streams = self
+            .in_mem_batches
+            .drain(..)
+            .map(|batch| {
+                let metrics = self.metrics_set.new_intermediate_tracking(
+                    self.partition_id,
+                    &self.runtime.memory_pool,
+                );
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
+            })
+            .collect::<Result<_>>()?;
+
+        // TODO: Run batch sorts concurrently (#6162)
+        // TODO: Pushdown fetch to streaming merge (#6000)

Review Comment:
   These are some ideas for further improvements



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123498


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -165,28 +154,18 @@ impl ExternalSorter {
 
     /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
     fn sort(&mut self) -> Result<SendableRecordBatchStream> {
-        let batch_size = self.session_config.batch_size();
-
         if self.spilled_before() {
             let intermediate_metrics = self
                 .metrics_set
                 .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
-            let mut merge_metrics = self
+
+            let merge_metrics = self
                 .metrics_set
                 .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
 
             let mut streams = vec![];
             if !self.in_mem_batches.is_empty() {
-                let in_mem_stream = in_mem_partial_sort(
-                    &mut self.in_mem_batches,
-                    self.schema.clone(),
-                    &self.expr,
-                    batch_size,
-                    intermediate_metrics,
-                    self.fetch,
-                )?;
-                // TODO: More accurate, dynamic memory accounting (#5885)
-                merge_metrics.init_mem_used(self.reservation.free());

Review Comment:
   This is redundant as the mem sort stream now accounts for its memory usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#issuecomment-1529173343

   I ran the `sort` benchmarks (shout out to @jaylmiller for adding them) on this branch https://github.com/apache/arrow-datafusion/tree/main/benchmarks#parquet-benchmarks
   
   They certainly look very promising:
   
   ```
   --------------------
   Benchmark sort.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃        sort ┃       sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Qsort utf8   │  80037.05ms │ 55724.44ms │ +1.44x faster │
   │ Qsort int    │  96166.00ms │ 69435.53ms │ +1.38x faster │
   │ Qsort        │  85487.77ms │ 57109.82ms │ +1.50x faster │
   │ decimal      │             │            │               │
   │ Qsort        │ 103824.30ms │ 78792.01ms │ +1.32x faster │
   │ integer      │             │            │               │
   │ tuple        │             │            │               │
   │ Qsort utf8   │  80706.73ms │ 63156.00ms │ +1.28x faster │
   │ tuple        │             │            │               │
   │ Qsort mixed  │  97100.16ms │ 68542.10ms │ +1.42x faster │
   │ tuple        │             │            │               │
   └──────────────┴─────────────┴────────────┴───────────────┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123234


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -1089,7 +820,7 @@ mod tests {
     #[tokio::test]
     async fn test_sort_fetch_memory_calculation() -> Result<()> {
         // This test mirrors down the size from the example above.
-        let avg_batch_size = 5000;
+        let avg_batch_size = 4000;

Review Comment:
   I had to adjust this down slightly, as the memory characteristics have changed slightly (for the better)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1182861817


##########
datafusion/core/tests/order_spill_fuzz.rs:
##########
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
             RecordBatch::try_from_iter(vec![(
                 "x",
                 Arc::new(Int32Array::from_iter_values(
-                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                    (0..to_read).map(|_| rng.gen()),

Review Comment:
   I guess I was thinking that without a comment it could easily regress in the future 🤷  maybe the memory accounting failures would catch it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181595935


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -242,28 +213,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+

Review Comment:
   this use of `stream`s is downright beautiful. It just is so easy to read



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -115,47 +113,37 @@ impl ExternalSorter {
         }
     }
 
-    async fn insert_batch(
-        &mut self,
-        input: RecordBatch,
-        tracking_metrics: &MemTrackingMetrics,
-    ) -> Result<()> {
-        if input.num_rows() > 0 {
-            let size = batch_byte_size(&input);
-            if self.reservation.try_grow(size).is_err() {
+    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
+    ///
+    /// Updates memory usage metrics, and possibly triggers spilling to disk
+    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() == 0 {
+            return Ok(());
+        }
+
+        let size = batch_byte_size(&input);
+        if self.reservation.try_grow(size).is_err() {
+            let before = self.reservation.size();
+            self.in_mem_sort().await?;
+            // Sorting may have freed memory, especially if fetch is not `None`
+            //
+            // As such we check again, and if the memory usage has dropped by
+            // a factor of 2, and we can allocate the necessary capacity,
+            // we don't spill
+            //
+            // The factor of 2 aims to avoid a degenerate case where the
+            // memory required for `fetch` is just under the memory available,
+            // causing repeated resorting of data
+            if self.reservation.size() > before / 2

Review Comment:
   in the case where `self.reservation.size() > before / 2` is true,  (and we have freed a bunch of memory and skop the spilling) it seems like `self.reservation_try_grow(size)` needs to be called again and return successfully
   
   Maybe we could just reverse the order of this check (`self.reservation.try_grow(size).is_err()` || `self.reservation.size() > before / 2`)  and add a comment noting it relies on the side effects
   



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {

Review Comment:
   This merging uses the `streaming_merge` implementation now, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "yjshen (via GitHub)" <gi...@apache.org>.
yjshen commented on PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#issuecomment-1528916786

   > Previously ExternalSorter would sort every batch it receives, but then perform an in-memory sort by concatenating all the inputs together.
   >
   > This PR modifies ExternalSorter to not presort batches
   
   I think that sorting the batch while it is still in the cache, immediately after its generation, can effectively utilize the cache and enhance performance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181278833


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {
-    /// Current logical position in the iterator
-    pos: usize,
-    /// Sorted composite index of where to find the rows in buffered batches
-    composite: Vec<CompositeIndex>,
-    /// Maximum batch size to produce
-    batch_size: usize,
+    Ok(RecordBatch::try_new(batch.schema(), columns)?)
 }
 
-impl SortedIterator {
-    fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
-        Self {
-            pos: 0,
-            composite,
-            batch_size,
-        }
-    }
-
-    fn memory_size(&self) -> usize {
-        std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
-    }
-}
-
-impl Iterator for SortedIterator {
-    type Item = Vec<CompositeSlice>;
-
-    /// Emit a max of `batch_size` positions each time
-    fn next(&mut self) -> Option<Self::Item> {
-        let length = self.composite.len();
-        if self.pos >= length {
-            return None;
-        }
-
-        let current_size = min(self.batch_size, length - self.pos);
-
-        // Combine adjacent indexes from the same batch to make a slice,
-        // for more efficient `extend` later.
-        let mut last_batch_idx = self.composite[self.pos].batch_idx;
-        let mut indices_in_batch = Vec::with_capacity(current_size);
-
-        let mut slices = vec![];
-        for ci in &self.composite[self.pos..self.pos + current_size] {
-            if ci.batch_idx != last_batch_idx {
-                group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-                last_batch_idx = ci.batch_idx;
-            }
-            indices_in_batch.push(ci.row_idx);
-        }
-
-        assert!(
-            !indices_in_batch.is_empty(),
-            "There should have at least one record in a sort output slice."
-        );
-        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
-        self.pos += current_size;
-        Some(slices)
-    }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
-    batch_idx: u32,
-    positions: &mut Vec<u32>,
-    output: &mut Vec<CompositeSlice>,
-) {
-    positions.sort_unstable();
-    let mut last_pos = 0;
-    let mut run_length = 0;
-    for pos in positions.iter() {
-        if run_length == 0 {
-            last_pos = *pos;
-            run_length = 1;
-        } else if *pos == last_pos + 1 {
-            run_length += 1;
-            last_pos = *pos;
-        } else {
-            output.push(CompositeSlice {
-                batch_idx,
-                start_row_idx: last_pos + 1 - run_length,
-                len: run_length as usize,
-            });
-            last_pos = *pos;
-            run_length = 1;
-        }
-    }
-    assert!(
-        run_length > 0,
-        "There should have at least one record in a sort output slice."
-    );
-    output.push(CompositeSlice {
-        batch_idx,
-        start_row_idx: last_pos + 1 - run_length,
-        len: run_length as usize,
-    });
-    positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
-    schema: SchemaRef,
+async fn spill_sorted_batches(
     batches: Vec<RecordBatch>,

Review Comment:
   I switched to an in-place sort so that we can potentially avoid needing to spill - https://github.com/apache/arrow-datafusion/pull/6163/files#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR269
   
   Given this, the stream was unnecessary added complexity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1182399065


##########
datafusion/core/tests/order_spill_fuzz.rs:
##########
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
             RecordBatch::try_from_iter(vec![(
                 "x",
                 Arc::new(Int32Array::from_iter_values(
-                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                    (0..to_read).map(|_| rng.gen()),

Review Comment:
   It seems strange to explain what was previously just an implementation bug, I think it is fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181123596


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -242,28 +214,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+
+        let size: usize = self
+            .in_mem_batches
+            .iter()
+            .map(|x| x.get_array_memory_size())
+            .sum();
+
+        self.metrics.mem_used().set(size);
+        self.reservation.resize(size);
+        self.in_mem_batches_sorted = true;
+        Ok(())
+    }
+
+    /// Consumes in_mem_batches returning a sorted stream
+    fn in_mem_sort_stream(
+        &mut self,
+        metrics: MemTrackingMetrics,
+    ) -> Result<SendableRecordBatchStream> {
+        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.len() == 1 {
+            let batch = self.in_mem_batches.remove(0);
+            let stream =
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+            self.in_mem_batches.clear();
+            return Ok(stream);
+        }
+
+        // If less than 1MB of in-memory data, concatenate and sort in place
+        //
+        // This is a very rough heuristic and likely could be refined further
+        if self.reservation.size() < 1048576 {
+            // Concatenate memory batches together and sort
+            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            self.in_mem_batches.clear();
+            return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+        }
+
+        let streams = self
+            .in_mem_batches
+            .drain(..)
+            .map(|batch| {
+                let metrics = self.metrics_set.new_intermediate_tracking(
+                    self.partition_id,
+                    &self.runtime.memory_pool,
+                );
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
+            })
+            .collect::<Result<_>>()?;
+
+        // TODO: Run batch sorts concurrently (#6162)
+        // TODO: Pushdown fetch to streaming merge (#6000)

Review Comment:
   These are some ideas for further improvements, both should be relatively straightforward to implement
   
   #6162 is potentially a big win, it will effectively get us parallel merge sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181278833


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {
-    /// Current logical position in the iterator
-    pos: usize,
-    /// Sorted composite index of where to find the rows in buffered batches
-    composite: Vec<CompositeIndex>,
-    /// Maximum batch size to produce
-    batch_size: usize,
+    Ok(RecordBatch::try_new(batch.schema(), columns)?)
 }
 
-impl SortedIterator {
-    fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
-        Self {
-            pos: 0,
-            composite,
-            batch_size,
-        }
-    }
-
-    fn memory_size(&self) -> usize {
-        std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
-    }
-}
-
-impl Iterator for SortedIterator {
-    type Item = Vec<CompositeSlice>;
-
-    /// Emit a max of `batch_size` positions each time
-    fn next(&mut self) -> Option<Self::Item> {
-        let length = self.composite.len();
-        if self.pos >= length {
-            return None;
-        }
-
-        let current_size = min(self.batch_size, length - self.pos);
-
-        // Combine adjacent indexes from the same batch to make a slice,
-        // for more efficient `extend` later.
-        let mut last_batch_idx = self.composite[self.pos].batch_idx;
-        let mut indices_in_batch = Vec::with_capacity(current_size);
-
-        let mut slices = vec![];
-        for ci in &self.composite[self.pos..self.pos + current_size] {
-            if ci.batch_idx != last_batch_idx {
-                group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-                last_batch_idx = ci.batch_idx;
-            }
-            indices_in_batch.push(ci.row_idx);
-        }
-
-        assert!(
-            !indices_in_batch.is_empty(),
-            "There should have at least one record in a sort output slice."
-        );
-        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
-        self.pos += current_size;
-        Some(slices)
-    }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
-    batch_idx: u32,
-    positions: &mut Vec<u32>,
-    output: &mut Vec<CompositeSlice>,
-) {
-    positions.sort_unstable();
-    let mut last_pos = 0;
-    let mut run_length = 0;
-    for pos in positions.iter() {
-        if run_length == 0 {
-            last_pos = *pos;
-            run_length = 1;
-        } else if *pos == last_pos + 1 {
-            run_length += 1;
-            last_pos = *pos;
-        } else {
-            output.push(CompositeSlice {
-                batch_idx,
-                start_row_idx: last_pos + 1 - run_length,
-                len: run_length as usize,
-            });
-            last_pos = *pos;
-            run_length = 1;
-        }
-    }
-    assert!(
-        run_length > 0,
-        "There should have at least one record in a sort output slice."
-    );
-    output.push(CompositeSlice {
-        batch_idx,
-        start_row_idx: last_pos + 1 - run_length,
-        len: run_length as usize,
-    });
-    positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
-    schema: SchemaRef,
+async fn spill_sorted_batches(
     batches: Vec<RecordBatch>,

Review Comment:
   I switched to an in-place sort so that we can potentially avoid needing to spill - https://github.com/apache/arrow-datafusion/pull/6163/files#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR128
   
   Given this, the stream was unnecessary added complexity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181214739


##########
datafusion/core/tests/order_spill_fuzz.rs:
##########
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
             RecordBatch::try_from_iter(vec![(
                 "x",
                 Arc::new(Int32Array::from_iter_values(
-                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                    (0..to_read).map(|_| rng.gen()),

Review Comment:
   This seemingly innocuous change is necessary because the previous formulation results in a size hint of 0, resulting in bump allocation of the Int32Array. This in turn causes it to be larger than it should be. Previously as we sorted on ingest this wasn't a problem, as it would rewrite the array, we no longer do this and so the memory accounting is impacted by this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181269851


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -205,8 +205,8 @@ mod tests {
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
             "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
             "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99    | 99    | 86          | 86          | 301  | 296  | 301   | 1004  | 305  | 305  | 301  | 301  | 1001  | 1002  | 1001   | 289    |",
+            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",

Review Comment:
   Since ts doesn't consist of unique values, result is not deterministic. In another Pr I will fix it. As far as this PR is concerned you are right, output order is not guaranteed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org