You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/13 17:33:57 UTC

[GitHub] [arrow-datafusion] e-dard opened a new pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

e-dard opened a new pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722


   **NOTE**: this requires Arrow 5.0 containing the following PR: https://github.com/apache/arrow-rs/pull/542
   
   # Which issue does this PR close?
   This will effectively close out https://github.com/apache/arrow-datafusion/issues/655
   
    # Rationale for this change
   
   When merging input record batches together into a single output stream the `SortPreservingMergeExec` operator currently builds a comparator for each column in the inputs every single time it compares one row to another. Due to the cost of building an Arrow `DynComparator` this ends up being prohibitively expensive.
   
   (I have more details in https://github.com/influxdata/influxdb_iox/issues/1983 but you can get the gist of the problem by looking at this profile:
   
   ![Screenshot 2021-07-12 at 10 50 22](https://user-images.githubusercontent.com/501993/125497581-3d08f363-203e-4699-b75a-ba0f1faf60e4.png)
   
   The same comparator should be usable for comparing two input record batches until you have completely merged their rows. Therefore the rationale for the change in this PR is ensure that that's what happens.
   
   # What changes are included in this PR?
   
   The state associated with the process or merging two input record batches is managed by a `SortKeyCursor`. It tracks the current row, the columns, the backing record batch and so on. This PR adds a collection of Arrow `DynComparator`s that can be used (re-used) every time that a `SortKeyCursor` needs to compare one of its rows to that of another `SortKeyCursor`.
   
   In order to know whether a cursor was being compared to a cursor it had seen before or not I needed to be able to uniquely identify an input record batch. I did this by incrementing an index each time a new record batch was fed into the operator.
   
   When row comparison is happening the comparator collection is consulted and the `DynComparator` used if it exists. Otherwise one is created and stored.
   
   ### Performance
   
   This PR significantly improves the performance of the `SortPreservingMergeExec` operator because it amortises the cost of creating a comparator over all row comparisons in the record batch, rather than it being a fixed cost for every single row comparison. Here are some existing benchmark results:
   
   ```
   $  critcmp master pr
   group                               master                          pr
   -----                               ------                          --
   interleave_batches                  1.83   623.8±12.41µs            1.00    341.2±6.98µs        
   merge_batches_no_overlap_large      1.56    400.6±4.94µs            1.00    256.3±6.57µs        
   merge_batches_no_overlap_small      1.63   425.1±24.88µs            1.00    261.1±7.46µs        
   merge_batches_small_into_large      1.18    228.0±3.95µs            1.00    193.6±2.86µs        
   merge_batches_some_overlap_large    1.68   505.4±10.27µs            1.00    301.3±6.63µs        
   merge_batches_some_overlap_small    1.64    515.7±5.21µs            1.00   314.6±12.66µs        
   ```
   
   The results sugges that the changes in this PR improve performance bu upto `1.8x`. However, since the performance delta is tied to the size of the input, the performance boost could be significantly larger for larger inputs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-881915536


   @e-dard  -- now that https://github.com/apache/arrow-datafusion/pull/721 is merged I took the liberty of rebasing this PR to pick up those changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-882498542


   One other thing I thought of last night is that this approach will effectively hold references to *all* input `Array`s (as the `dyn comparators have ref counts to the arrays in them). I filed https://github.com/apache/arrow-rs/issues/563 to track the improvement


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-882498542


   One other thing I thought of last night is that this approach will effectively hold references to *all* input `Array`s (as the `dyn comparators have ref counts to the arrays in them). I filed https://github.com/apache/arrow-rs/issues/563 to track the improvement


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-881915771


   Note to anyone else reviewing / looking at this change: While DataFusion does not (yet) use the `SortPreservingMerge` operator, we use it in IOx and we hope to introduce additional sort-based optimizations to DataFusion (which will require `SortPreservingMerge` or something similar)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
e-dard commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-882400536


   I'm happy for this to be merged as-and-when. Cheers 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
e-dard commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r671826737



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       The HashMap only lives for as long as the lifetime of the sort preserving merge operator. I suppose you could merge n streams large enough to make the old comparators take up non-negligible memory. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
e-dard commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r668989904



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       Ha. Yeah that's how I had it but you have to re-create the iterator. I'm happy to go with whatever




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-882498542


   One other thing I thought of last night is that this approach will effectively hold references to *all* input `Array`s (as the `dyn comparators have ref counts to the arrays in them). I filed https://github.com/apache/arrow-rs/issues/563 to track the improvement


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r671826096



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       Do you mean to reduce the peak memory usage by freeing comparators?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
e-dard commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-882400536


   I'm happy for this to be merged as-and-when. Cheers 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r671830596



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       I personally think the "right" thing to do in this case is to create an arrow comparator interface that doesn't have bound arrays, so like
   ```
   let comparator = arrow::compute::build_comparator(array1.data_type(), array2.data_type());
   
   ...
   
   if comparator(array1, index_1, array2, index_2) == Ordering::Equal {
     ...
   }
   ```
   
   With that interface we could simply use the same comparator 
   
   So that is to say, I recommend leaving this PR the way it is and putting our efforts into a better comparison interface rather than trying to optimize for a small amount of memory savings here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r671827098



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       Yes, that's what I meant. The memory usage would increase over time by holding onto the older comparators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r669104634



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       I think this way is fine, personally




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r668985768



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -255,15 +295,11 @@ impl SortKeyCursor {
                 }
                 (true, false) => return Ok(Ordering::Less),
                 (false, false) => {}
-                (true, true) => {
-                    // TODO: Building the predicate each time is sub-optimal

Review comment:
       🎉 

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       I was initially worried that mutating `cmp` below might mask errors (e.g. that the number of columns had grown somehow), so I wonder if it is worth putting the initialization of `cmp` into this callsite (rather than the loop below)?
   
   However, perhaps that avoids having to clone or collect `zipped`.
   
   Please feel free to ignore this comment

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -176,34 +178,60 @@ impl ExecutionPlan for SortPreservingMergeExec {
     }
 }
 
-/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
-/// evaluated on the `RecordBatch` yield the sort keys.
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
+/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort keys.
 ///
 /// Additionally it maintains a row cursor that can be advanced through the rows
 /// of the provided `RecordBatch`
 ///
-/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
-/// row cursor, with that of another `SortKeyCursor`
-#[derive(Debug, Clone)]
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to
+/// by this row cursor, with that of another `SortKeyCursor`. A cursor stores
+/// a row comparator for each other cursor that it is compared to.
 struct SortKeyCursor {
     columns: Vec<ArrayRef>,
-    batch: RecordBatch,
     cur_row: usize,
     num_rows: usize,
+
+    // An index uniquely identifying the record batch scanned by this cursor.
+    batch_idx: usize,
+    batch: RecordBatch,
+
+    // A collection of comparators that compare rows in this cursor's batch to
+    // the cursors in other batches. Other batches are uniquely identified by
+    // their batch_idx.

Review comment:
       is it worth mentioning that the index of the `Vec` are the sort column positions (not other batch indexes)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#discussion_r671825464



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));

Review comment:
       Don't we need to clean older `batch_idx` over time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on pull request #722: perf: improve performance of `SortPreservingMergeExec` operator

Posted by GitBox <gi...@apache.org>.
e-dard commented on pull request #722:
URL: https://github.com/apache/arrow-datafusion/pull/722#issuecomment-881359516


   Once DataFusion is using Arrow 5.0 (which @alamb informs me should be relatively soon), I will rebase this PR and get it green. 
   
   FWIW I have tested this PR internally and it has effectively eradicated `SortKeyCursor::compare` from our CPU profiles 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org