You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "askoa (via GitHub)" <gi...@apache.org> on 2023/02/11 20:15:28 UTC

[GitHub] [arrow-rs] askoa opened a new pull request, #3695: WIP: feat: Sort kernel for `RunArray`

askoa opened a new pull request, #3695:
URL: https://github.com/apache/arrow-rs/pull/3695

   # Which issue does this PR close?
   Part of #3520 
   
   # Rationale for this change
   See issue description 
   
   # What changes are included in this PR?
   Built on top to yet to be merged PR #3681 
   - `sort_to_indices` for `RunArray`
   
   # Are there any user-facing changes?
   Users will get a new sort function for `RunArray`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] askoa commented on a diff in pull request #3695: feat: Sort kernel for `RunArray`

Posted by "askoa (via GitHub)" <gi...@apache.org>.
askoa commented on code in PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#discussion_r1106605544


##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,206 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(

Review Comment:
   It's going to take lot more time as `take_run` has to convert the logical indices to physical indices. Based on latest `take_run` benchmark results (https://github.com/apache/arrow-rs/pull/3705), it is around 30 µs. So `sort_to_indices + take` might take some where around 40 µs compared to 10 µs taken by `sort_run`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #3695: feat: Sort kernel for `RunArray`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] askoa commented on a diff in pull request #3695: feat: Sort kernel for `RunArray`

Posted by "askoa (via GitHub)" <gi...@apache.org>.
askoa commented on code in PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#discussion_r1114306834


##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,208 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    match values.data_type() {
+        DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() {
+            DataType::Int16 => sort_run_downcasted::<Int16Type>(values, options, limit),
+            DataType::Int32 => sort_run_downcasted::<Int32Type>(values, options, limit),
+            DataType::Int64 => sort_run_downcasted::<Int64Type>(values, options, limit),
+            dt => unreachable!("Not valid run ends data type {dt}"),
+        },
+        dt => Err(ArrowError::InvalidArgumentError(format!(
+            "Input is not a run encoded array. Input data type {dt}"
+        ))),
+    }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, options, None)?;
+
+    // Determine the length of output run array.
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let run_ends = run_array.run_ends();
+
+    let mut new_run_ends_builder = BufferBuilder::<R::Native>::new(physical_len);
+    let mut new_run_end: usize = 0;
+    let mut new_physical_len: usize = 0;
+
+    // calculate run length of sorted value indices and add them to new_run_ends.
+    for physical_index in values_indices.into_iter() {
+        // As the values were sliced with offset = start_physical_index, it has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = physical_index.unwrap() as usize + start_physical_index;
+
+        // calculate the run length.
+        let run_length = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of start_physical_index
+            // and remaining_len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                run_ends.value_unchecked(physical_index).as_usize() - run_array.offset()
+            } else if physical_index == end_physical_index {
+                run_array.offset() + run_array.len()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            } else {
+                run_ends.value_unchecked(physical_index).as_usize()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            }
+        };
+        let run_length = run_length.min(remaining_len);
+        new_run_end += run_length;
+        new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+        new_physical_len += 1;
+
+        remaining_len -= run_length;
+        if remaining_len == 0 {
+            break;
+        }
+    }
+
+    if remaining_len > 0 {
+        panic!("Remaining length should be zero its values is {remaining_len}")
+    }
+
+    let new_run_ends = unsafe {
+        // Safety:
+        // The function builds a valid run_ends array and hence need not be validated.
+        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+            .len(new_physical_len)
+            .null_count(0)
+            .add_buffer(new_run_ends_builder.finish())
+            .build_unchecked()
+    };
+
+    // slice the sorted value indices based on limit.
+    let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+        .slice(0, new_run_ends.len())
+        .into_data()
+        .into();
+
+    let new_values = take(&run_values, &new_values_indices, None)?;
+
+    // Build sorted run array
+    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+        .len(new_run_end)
+        .add_child_data(new_run_ends)
+        .add_child_data(new_values.into_data());
+    let array_data: RunArray<R> = unsafe {
+        // Safety:
+        //  This function builds a valid run array and hence can skip validation.
+        builder.build_unchecked().into()
+    };
+    Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: &SortOptions,
+    limit: Option<usize>,
+) -> UInt32Array {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, Some(*options), None).unwrap();
+
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let mut result: Vec<u32> = Vec::with_capacity(remaining_len);
+
+    let run_ends = run_array.run_ends();
+
+    // Calculate `run length` of sorted value indices.
+    // Find the `logical index` of the value index.
+    // Add `logical index` to the output `run length` times.
+    for physical_index in values_indices.into_iter() {
+        // As the values were sliced with offset = start_physical_index, it has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = physical_index.unwrap() as usize + start_physical_index;
+
+        // calculate the run length and logical index of sorted values
+        let (run_length, logical_index_start) = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of start_physical_index
+            // and len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                (
+                    run_ends.value_unchecked(physical_index).as_usize()
+                        - run_array.offset(),
+                    0,
+                )
+            } else if physical_index == end_physical_index {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_array.offset() + run_array.len() - prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            } else {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_ends.value_unchecked(physical_index).as_usize() - prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            }

Review Comment:
   This makes sense. Both functions look very similar for the most part. I used a consumer to consume run_length and logical_start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] ursabot commented on pull request #3695: feat: Sort kernel for `RunArray`

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#issuecomment-1441475084

   Benchmark runs are scheduled for baseline = e753dea6634524e821eba60f6c7d91293a317120 and contender = ebe6f539844ba781553c87bdaa2dd25190047c49. ebe6f539844ba781553c87bdaa2dd25190047c49 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/c828e3af794d49af891d89e54050e23e...468b98a237704b1ab65b4a38a43284d8/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/4f9b8cd21ef34886b9541e44beb1684a...213b9624b597441e9e8cf7376674e617/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/6097f452d2964e60abed0d5bae10abbc...b29290d90e2f4c159f60506ee2743c4f/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/044fdc862d7f460eb0c5279a3acab372...ce8583b85470464080901459e4457bb4/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #3695: feat: Sort kernel for `RunArray`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#discussion_r1113192078


##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,208 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    match values.data_type() {
+        DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() {
+            DataType::Int16 => sort_run_downcasted::<Int16Type>(values, options, limit),
+            DataType::Int32 => sort_run_downcasted::<Int32Type>(values, options, limit),
+            DataType::Int64 => sort_run_downcasted::<Int64Type>(values, options, limit),
+            dt => unreachable!("Not valid run ends data type {dt}"),
+        },
+        dt => Err(ArrowError::InvalidArgumentError(format!(
+            "Input is not a run encoded array. Input data type {dt}"
+        ))),
+    }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, options, None)?;
+
+    // Determine the length of output run array.
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let run_ends = run_array.run_ends();
+
+    let mut new_run_ends_builder = BufferBuilder::<R::Native>::new(physical_len);
+    let mut new_run_end: usize = 0;
+    let mut new_physical_len: usize = 0;
+
+    // calculate run length of sorted value indices and add them to new_run_ends.
+    for physical_index in values_indices.into_iter() {

Review Comment:
   ```suggestion
       for physical_index in values_indices.values() {
   ```
   Might be marginally faster as we know the null mask must be 0. Perhaps could add an `assert_eq!(value_indices.null_count(), 0);` just to be sure



##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,208 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    match values.data_type() {
+        DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() {
+            DataType::Int16 => sort_run_downcasted::<Int16Type>(values, options, limit),
+            DataType::Int32 => sort_run_downcasted::<Int32Type>(values, options, limit),
+            DataType::Int64 => sort_run_downcasted::<Int64Type>(values, options, limit),
+            dt => unreachable!("Not valid run ends data type {dt}"),
+        },
+        dt => Err(ArrowError::InvalidArgumentError(format!(
+            "Input is not a run encoded array. Input data type {dt}"
+        ))),
+    }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, options, None)?;
+
+    // Determine the length of output run array.
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let run_ends = run_array.run_ends();
+
+    let mut new_run_ends_builder = BufferBuilder::<R::Native>::new(physical_len);
+    let mut new_run_end: usize = 0;
+    let mut new_physical_len: usize = 0;
+
+    // calculate run length of sorted value indices and add them to new_run_ends.
+    for physical_index in values_indices.into_iter() {
+        // As the values were sliced with offset = start_physical_index, it has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = physical_index.unwrap() as usize + start_physical_index;
+
+        // calculate the run length.
+        let run_length = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of start_physical_index
+            // and remaining_len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                run_ends.value_unchecked(physical_index).as_usize() - run_array.offset()
+            } else if physical_index == end_physical_index {
+                run_array.offset() + run_array.len()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            } else {
+                run_ends.value_unchecked(physical_index).as_usize()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            }
+        };
+        let run_length = run_length.min(remaining_len);
+        new_run_end += run_length;
+        new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+        new_physical_len += 1;
+
+        remaining_len -= run_length;
+        if remaining_len == 0 {
+            break;
+        }
+    }
+
+    if remaining_len > 0 {
+        panic!("Remaining length should be zero its values is {remaining_len}")
+    }
+
+    let new_run_ends = unsafe {
+        // Safety:
+        // The function builds a valid run_ends array and hence need not be validated.
+        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+            .len(new_physical_len)
+            .null_count(0)
+            .add_buffer(new_run_ends_builder.finish())
+            .build_unchecked()
+    };
+
+    // slice the sorted value indices based on limit.
+    let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+        .slice(0, new_run_ends.len())
+        .into_data()
+        .into();
+
+    let new_values = take(&run_values, &new_values_indices, None)?;
+
+    // Build sorted run array
+    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+        .len(new_run_end)
+        .add_child_data(new_run_ends)
+        .add_child_data(new_values.into_data());
+    let array_data: RunArray<R> = unsafe {
+        // Safety:
+        //  This function builds a valid run array and hence can skip validation.
+        builder.build_unchecked().into()
+    };
+    Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: &SortOptions,
+    limit: Option<usize>,
+) -> UInt32Array {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, Some(*options), None).unwrap();
+
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let mut result: Vec<u32> = Vec::with_capacity(remaining_len);
+
+    let run_ends = run_array.run_ends();
+
+    // Calculate `run length` of sorted value indices.
+    // Find the `logical index` of the value index.
+    // Add `logical index` to the output `run length` times.
+    for physical_index in values_indices.into_iter() {

Review Comment:
   ```suggestion
       for physical_index in values_indices.values() {
   ```



##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,208 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    match values.data_type() {
+        DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() {
+            DataType::Int16 => sort_run_downcasted::<Int16Type>(values, options, limit),
+            DataType::Int32 => sort_run_downcasted::<Int32Type>(values, options, limit),
+            DataType::Int64 => sort_run_downcasted::<Int64Type>(values, options, limit),
+            dt => unreachable!("Not valid run ends data type {dt}"),
+        },
+        dt => Err(ArrowError::InvalidArgumentError(format!(
+            "Input is not a run encoded array. Input data type {dt}"
+        ))),
+    }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, options, None)?;
+
+    // Determine the length of output run array.
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let run_ends = run_array.run_ends();
+
+    let mut new_run_ends_builder = BufferBuilder::<R::Native>::new(physical_len);
+    let mut new_run_end: usize = 0;
+    let mut new_physical_len: usize = 0;
+
+    // calculate run length of sorted value indices and add them to new_run_ends.
+    for physical_index in values_indices.into_iter() {
+        // As the values were sliced with offset = start_physical_index, it has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = physical_index.unwrap() as usize + start_physical_index;
+
+        // calculate the run length.
+        let run_length = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of start_physical_index
+            // and remaining_len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                run_ends.value_unchecked(physical_index).as_usize() - run_array.offset()
+            } else if physical_index == end_physical_index {
+                run_array.offset() + run_array.len()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            } else {
+                run_ends.value_unchecked(physical_index).as_usize()
+                    - run_ends.value_unchecked(physical_index - 1).as_usize()
+            }
+        };
+        let run_length = run_length.min(remaining_len);
+        new_run_end += run_length;
+        new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+        new_physical_len += 1;
+
+        remaining_len -= run_length;
+        if remaining_len == 0 {
+            break;
+        }
+    }
+
+    if remaining_len > 0 {
+        panic!("Remaining length should be zero its values is {remaining_len}")
+    }
+
+    let new_run_ends = unsafe {
+        // Safety:
+        // The function builds a valid run_ends array and hence need not be validated.
+        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+            .len(new_physical_len)
+            .null_count(0)
+            .add_buffer(new_run_ends_builder.finish())
+            .build_unchecked()
+    };
+
+    // slice the sorted value indices based on limit.
+    let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+        .slice(0, new_run_ends.len())
+        .into_data()
+        .into();
+
+    let new_values = take(&run_values, &new_values_indices, None)?;
+
+    // Build sorted run array
+    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+        .len(new_run_end)
+        .add_child_data(new_run_ends)
+        .add_child_data(new_values.into_data());
+    let array_data: RunArray<R> = unsafe {
+        // Safety:
+        //  This function builds a valid run array and hence can skip validation.
+        builder.build_unchecked().into()
+    };
+    Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: &SortOptions,
+    limit: Option<usize>,
+) -> UInt32Array {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, Some(*options), None).unwrap();
+
+    let mut remaining_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let mut result: Vec<u32> = Vec::with_capacity(remaining_len);
+
+    let run_ends = run_array.run_ends();
+
+    // Calculate `run length` of sorted value indices.
+    // Find the `logical index` of the value index.
+    // Add `logical index` to the output `run length` times.
+    for physical_index in values_indices.into_iter() {
+        // As the values were sliced with offset = start_physical_index, it has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = physical_index.unwrap() as usize + start_physical_index;
+
+        // calculate the run length and logical index of sorted values
+        let (run_length, logical_index_start) = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of start_physical_index
+            // and len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                (
+                    run_ends.value_unchecked(physical_index).as_usize()
+                        - run_array.offset(),
+                    0,
+                )
+            } else if physical_index == end_physical_index {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_array.offset() + run_array.len() - prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            } else {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_ends.value_unchecked(physical_index).as_usize() - prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            }

Review Comment:
   I wonder if there is some way to extract this logic into a function, ideally that can be shared with sort_run_downcasted. Perhaps it a function that returns the logical index range for a given physical index??



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] askoa commented on pull request #3695: feat: Sort kernel for `RunArray`

Posted by "askoa (via GitHub)" <gi...@apache.org>.
askoa commented on PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#issuecomment-1431679476

   I just noticed a key issue in this code. So changing this to draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #3695: feat: Sort kernel for `RunArray`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#discussion_r1106244242


##########
arrow-array/src/run_iterator.rs:
##########
@@ -57,13 +57,8 @@ where
 {
     /// create a new iterator
     pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
-        let current_front_physical: usize =
-            array.run_array().get_physical_index(0).unwrap();
-        let current_back_physical: usize = array
-            .run_array()
-            .get_physical_index(array.len() - 1)
-            .unwrap()
-            + 1;
+        let current_front_physical: usize = array.run_array().get_start_physical_index();
+        let current_back_physical: usize = array.run_array().get_end_physical_index() + 1;

Review Comment:
   ```suggestion
           let current_front_physical = array.run_array().get_start_physical_index();
           let current_back_physical = array.run_array().get_end_physical_index() + 1;
   ```



##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,206 @@ fn insert_valid_values<T>(result_slice: &mut [u32], offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(

Review Comment:
   How much of a performance difference does this special case have? At least for query engines, it is very rare to be sorting a single column by itself, more common is sorting multiple columns by one (i.e. sort_to_indices) or more (i.e. lexsort_to_indices) other columns? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org