You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/08 17:23:06 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request, #2182: fix: Sort with a lot of repetition values

yjshen opened a new pull request, #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
    # Rationale for this change
   
   While sorting by a column with a great many repetition values, we may get non-continuous indices from a batch to output. This fails the current assumption that indices are always continuous, and make `extend` output wrong slices.
   
   For example, in our test on a 1TB TPC-DS dataset, sort by `inventory.inv_date_sk`. we are witnessing 
   
   ```
   CompositeIndex { batch_idx: 68, row_idx: 4164 }
   CompositeIndex { batch_idx: 68, row_idx: 4293 }
   CompositeIndex { batch_idx: 68, row_idx: 4294 }
   CompositeIndex { batch_idx: 68, row_idx: 4295 }
   CompositeIndex { batch_idx: 68, row_idx: 4296 }
   CompositeIndex { batch_idx: 68, row_idx: 4297 }
   ```
   using start_row_idx, 4164 with length 6 is wrong, since rows with indices from 4165 to 4169 should be output elsewhere, and it's likely 4296 or 4297 are pointing to the next sort key.
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   Bugfix along with the minimum parquet test file that could reproduce the bug.
   
   # Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846405555


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        let indices = indices_in_batch.drain(..).collect::<Vec<_>>();

Review Comment:
   Sam here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan merged pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
Dandandan merged PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846379390


##########
datafusion/core/tests/sql/order.rs:
##########
@@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> {
     assert_eq!(results.len(), 0);
     Ok(())
 }
+
+#[tokio::test]
+async fn sort_with_lots_of_repetition_values() -> Result<()> {

Review Comment:
   ❤️  love the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846380977


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];

Review Comment:
   I don't understand why using a `Vec` here is better. Aren't the indices a contiguous range?
   
   I wonder what about using  `Option<std::ops::Range>` instead https://doc.rust-lang.org/std/ops/struct.Range.html ?
   
   Maybe as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846386780


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;

Review Comment:
   No, as described in the PR description, the bug comes from non continuous indexes, which is introduced by unstable lexsort. So it's possible we will see several disjoint ranges comes from one batch. Since the gap between the ranges are of same key but moved by unstable sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846388862


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;

Review Comment:
   Ah, I see 👍  -- that is a tricky one . So sad



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#issuecomment-1094265321

   > I wonder if we could better switch to use stable sort in the arrow rs lexsort implementation instead (offering both stable/unstable sort), for performance.
   
   @Jimexist  added unstable sorting here: https://github.com/apache/arrow-rs/pull/552
   
   Maybe we could add the choice of which to use


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846559198


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        let indices = indices_in_batch.drain(..).collect::<Vec<_>>();

Review Comment:
   Done, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846564589


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+        group_indices(last_batch_idx, indices, &mut slices);
 
         self.pos += current_size;
         Some(slices)
     }
 }
 
+/// Group continuous indices into a slice for better `extend` performance
+#[allow(clippy::stable_sort_primitive)]
+fn group_indices(
+    batch_idx: u32,
+    mut positions: Vec<u32>,
+    output: &mut Vec<CompositeSlice>,
+) {
+    // use sort instead of sort_unstable since it's likely nearly sorted.
+    positions.sort();

Review Comment:
   Per doc for `sort` and `sort_unstable`, it seems `sort` is more suitable for the nearly sorted indices. After `cargo test --release` on the newly introduced `sort_with_lots_of_repetition_values`, unstable yields better performance even on nearly sorted values.
   
   Changed to use `sort_unstable`, Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#issuecomment-1093142724

   cc @richox


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846381517


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;

Review Comment:
   is this the bug -- that len should be reset to 0 rather than 1?



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;

Review Comment:
   is this the bug -- that `len` should be reset to 0 rather than 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846406278


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+        group_indices(last_batch_idx, indices, &mut slices);
 
         self.pos += current_size;
         Some(slices)
     }
 }
 
+/// Group continuous indices into a slice for better `extend` performance
+#[allow(clippy::stable_sort_primitive)]
+fn group_indices(
+    batch_idx: u32,
+    mut positions: Vec<u32>,
+    output: &mut Vec<CompositeSlice>,
+) {
+    // use sort instead of sort_unstable since it's likely nearly sorted.
+    positions.sort();

Review Comment:
   This won't give different results, no? Why stable sort here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846386780


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;

Review Comment:
   No, as described in the PR description, the bug comes from non continuous indexes, which is introduced by unstable lexsort. So it's possible we will see several disjoint ranges comes from one batch. The gap between the ranges are of same key but moved by unstable sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846405031


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);

Review Comment:
   Could just pass `indices_in_batch` + `clear` instead of drain/collect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
Dandandan commented on PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#issuecomment-1093800452

   Good to have this fixed!
   
   I wonder if we could better switch to use stable sort in the arrow rs lexsort implementation instead (offering both stable/unstable sort), for performance.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2182: fix: Sort with a lot of repetition values

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2182:
URL: https://github.com/apache/arrow-datafusion/pull/2182#discussion_r846387172


##########
datafusion/core/tests/sql/order.rs:
##########
@@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> {
     assert_eq!(results.len(), 0);
     Ok(())
 }
+
+#[tokio::test]
+async fn sort_with_lots_of_repetition_values() -> Result<()> {

Review Comment:
   BTW I ran the test locally without the changes in this PR to confirm coverage:
   
   ```shell
   cargo test -p datafusion --test sql_integration -- sort_with_lots_of_repetition_values
   ```
   
   They failed with:
   
   ```
   
   ---- sql::order::sort_with_lots_of_repetition_values stdout ----
   thread 'sql::order::sort_with_lots_of_repetition_values' panicked at 'assertion failed: `(left == right)`
     left: `Some(2451809)`,
    right: `Some(2451816)`', datafusion/core/tests/sql/order.rs:220:9
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   ```
   
   👍 



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -426,52 +426,78 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
             let p = self.pos + i;
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same batch,
-                // it must be of the same key with the row pointed by start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+                group_indices(last_batch_idx, indices, &mut slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        let indices = indices_in_batch.drain(..).collect::<Vec<_>>();
+        group_indices(last_batch_idx, indices, &mut slices);
 
         self.pos += current_size;
         Some(slices)
     }
 }
 
+/// Group continuous indices into a slice for better `extend` performance

Review Comment:
   I don't really understand how positions can be non contiguous (doesn't this function get called each time `batch_idx` changes to a new batch)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org