You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/10 14:58:46 UTC

[arrow-rs] branch active_release updated: refactor lexico sort (#424) (#441)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/active_release by this push:
     new a7656a8  refactor lexico sort (#424) (#441)
a7656a8 is described below

commit a7656a8a3cd1f02e4543e1b971842ca92404f82a
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jun 10 10:58:36 2021 -0400

    refactor lexico sort (#424) (#441)
    
    Co-authored-by: Jiayu Liu <Ji...@users.noreply.github.com>
---
 arrow/Cargo.toml                       |   4 +
 arrow/benches/partition_kernels.rs     | 142 +++++++++++++++
 arrow/benches/sort_kernel.rs           |   4 +-
 arrow/src/compute/kernels/mod.rs       |   1 +
 arrow/src/compute/kernels/partition.rs | 314 +++++++++++++++++++++++++++++++++
 arrow/src/compute/mod.rs               |   1 +
 6 files changed, 464 insertions(+), 2 deletions(-)

diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 610f0f0..96ca6b8 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -124,6 +124,10 @@ name = "sort_kernel"
 harness = false
 
 [[bench]]
+name = "partition_kernels"
+harness = false
+
+[[bench]]
 name = "csv_writer"
 harness = false
 
diff --git a/arrow/benches/partition_kernels.rs b/arrow/benches/partition_kernels.rs
new file mode 100644
index 0000000..6a9ce70
--- /dev/null
+++ b/arrow/benches/partition_kernels.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+use criterion::Criterion;
+use std::sync::Arc;
+extern crate arrow;
+use arrow::compute::kernels::partition::lexicographical_partition_ranges;
+use arrow::compute::kernels::sort::{lexsort, SortColumn};
+use arrow::util::bench_util::*;
+use arrow::{
+    array::*,
+    datatypes::{ArrowPrimitiveType, Float64Type, UInt8Type},
+};
+use rand::distributions::{Distribution, Standard};
+use std::iter;
+
+fn create_array<T: ArrowPrimitiveType>(size: usize, with_nulls: bool) -> ArrayRef
+where
+    Standard: Distribution<T::Native>,
+{
+    let null_density = if with_nulls { 0.5 } else { 0.0 };
+    let array = create_primitive_array::<T>(size, null_density);
+    Arc::new(array)
+}
+
+fn bench_partition(sorted_columns: &[ArrayRef]) {
+    let columns = sorted_columns
+        .iter()
+        .map(|arr| SortColumn {
+            values: arr.clone(),
+            options: None,
+        })
+        .collect::<Vec<_>>();
+
+    criterion::black_box(lexicographical_partition_ranges(&columns).unwrap());
+}
+
+fn create_sorted_low_cardinality_data(length: usize) -> Vec<ArrayRef> {
+    let arr = Int64Array::from_iter_values(
+        iter::repeat(1)
+            .take(length / 4)
+            .chain(iter::repeat(2).take(length / 4))
+            .chain(iter::repeat(3).take(length / 4))
+            .chain(iter::repeat(4).take(length / 4)),
+    );
+    lexsort(
+        &[SortColumn {
+            values: Arc::new(arr),
+            options: None,
+        }],
+        None,
+    )
+    .unwrap()
+}
+
+fn create_sorted_float_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
+    lexsort(
+        &[
+            SortColumn {
+                values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
+                options: None,
+            },
+            SortColumn {
+                values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
+                options: None,
+            },
+        ],
+        None,
+    )
+    .unwrap()
+}
+
+fn create_sorted_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
+    lexsort(
+        &[
+            SortColumn {
+                values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
+                options: None,
+            },
+            SortColumn {
+                values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
+                options: None,
+            },
+        ],
+        None,
+    )
+    .unwrap()
+}
+
+fn add_benchmark(c: &mut Criterion) {
+    let sorted_columns = create_sorted_data(10, false);
+    c.bench_function("lexicographical_partition_ranges(u8) 2^10", |b| {
+        b.iter(|| bench_partition(&sorted_columns))
+    });
+
+    let sorted_columns = create_sorted_data(12, false);
+    c.bench_function("lexicographical_partition_ranges(u8) 2^12", |b| {
+        b.iter(|| bench_partition(&sorted_columns))
+    });
+
+    let sorted_columns = create_sorted_data(10, true);
+    c.bench_function(
+        "lexicographical_partition_ranges(u8) 2^10 with nulls",
+        |b| b.iter(|| bench_partition(&sorted_columns)),
+    );
+
+    let sorted_columns = create_sorted_data(12, true);
+    c.bench_function(
+        "lexicographical_partition_ranges(u8) 2^12 with nulls",
+        |b| b.iter(|| bench_partition(&sorted_columns)),
+    );
+
+    let sorted_columns = create_sorted_float_data(10, false);
+    c.bench_function("lexicographical_partition_ranges(f64) 2^10", |b| {
+        b.iter(|| bench_partition(&sorted_columns))
+    });
+
+    let sorted_columns = create_sorted_low_cardinality_data(1024);
+    c.bench_function(
+        "lexicographical_partition_ranges(low cardinality) 1024",
+        |b| b.iter(|| bench_partition(&sorted_columns)),
+    );
+}
+
+criterion_group!(benches, add_benchmark);
+criterion_main!(benches);
diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs
index 74dc0ce..8467b50 100644
--- a/arrow/benches/sort_kernel.rs
+++ b/arrow/benches/sort_kernel.rs
@@ -33,10 +33,10 @@ fn create_array(size: usize, with_nulls: bool) -> ArrayRef {
     Arc::new(array)
 }
 
-fn bench_sort(arr_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
+fn bench_sort(array_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
     let columns = vec![
         SortColumn {
-            values: arr_a.clone(),
+            values: array_a.clone(),
             options: None,
         },
         SortColumn {
diff --git a/arrow/src/compute/kernels/mod.rs b/arrow/src/compute/kernels/mod.rs
index 862f55f..a0ef50a 100644
--- a/arrow/src/compute/kernels/mod.rs
+++ b/arrow/src/compute/kernels/mod.rs
@@ -28,6 +28,7 @@ pub mod concat;
 pub mod filter;
 pub mod length;
 pub mod limit;
+pub mod partition;
 pub mod regexp;
 pub mod sort;
 pub mod substring;
diff --git a/arrow/src/compute/kernels/partition.rs b/arrow/src/compute/kernels/partition.rs
new file mode 100644
index 0000000..e91f80b
--- /dev/null
+++ b/arrow/src/compute/kernels/partition.rs
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines partition kernel for `ArrayRef`
+
+use crate::compute::kernels::sort::LexicographicalComparator;
+use crate::compute::SortColumn;
+use crate::error::{ArrowError, Result};
+use std::cmp::Ordering;
+use std::ops::Range;
+
+/// Given a list of already sorted columns, find partition ranges that would partition
+/// lexicographically equal values across columns.
+///
+/// Here LexicographicalComparator is used in conjunction with binary
+/// search so the columns *MUST* be pre-sorted already.
+///
+/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive
+/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last
+/// range.
+pub fn lexicographical_partition_ranges(
+    columns: &[SortColumn],
+) -> Result<Vec<Range<usize>>> {
+    let partition_points = lexicographical_partition_points(columns)?;
+    Ok(partition_points
+        .iter()
+        .zip(partition_points[1..].iter())
+        .map(|(&start, &end)| Range { start, end })
+        .collect())
+}
+
+/// Given a list of already sorted columns, find partition ranges that would partition
+/// lexicographically equal values across columns.
+///
+/// Here LexicographicalComparator is used in conjunction with binary
+/// search so the columns *MUST* be pre-sorted already.
+///
+/// The returned vec would be of size k+1 where k is cardinality of the sorted values; the first and
+/// last value would be 0 and n.
+fn lexicographical_partition_points(columns: &[SortColumn]) -> Result<Vec<usize>> {
+    if columns.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Sort requires at least one column".to_string(),
+        ));
+    }
+    let row_count = columns[0].values.len();
+    if columns.iter().any(|item| item.values.len() != row_count) {
+        return Err(ArrowError::ComputeError(
+            "Lexical sort columns have different row counts".to_string(),
+        ));
+    };
+
+    let mut result = vec![];
+    if row_count == 0 {
+        return Ok(result);
+    }
+
+    let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
+    let value_indices = (0..row_count).collect::<Vec<usize>>();
+
+    let mut previous_partition_point = 0;
+    result.push(previous_partition_point);
+    while previous_partition_point < row_count {
+        // invariant:
+        // value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
+        // so in order to save time we can do binary search on the value_indices[previous_partition_point..]
+        // and find when any value is greater than value_indices[previous_partition_point]; because we are using
+        // new indices, the new offset is _added_ to the previous_partition_point.
+        //
+        // be careful that idx is of type &usize which points to the actual value within value_indices, which itself
+        // contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
+        // original columnar data.
+        previous_partition_point += value_indices[previous_partition_point..]
+            .partition_point(|idx| {
+                lexicographical_comparator.compare(idx, &previous_partition_point)
+                    != Ordering::Greater
+            });
+        result.push(previous_partition_point);
+    }
+
+    Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::array::*;
+    use crate::compute::SortOptions;
+    use crate::datatypes::DataType;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_lexicographical_partition_points_empty() {
+        let input = vec![];
+        assert!(
+            lexicographical_partition_points(&input).is_err(),
+            "lexicographical_partition_points should reject columns with empty rows"
+        );
+    }
+
+    #[test]
+    fn test_lexicographical_partition_points_unaligned_rows() {
+        let input = vec![
+            SortColumn {
+                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
+                options: None,
+            },
+            SortColumn {
+                values: Arc::new(StringArray::from(vec![Some("foo")])) as ArrayRef,
+                options: None,
+            },
+        ];
+        assert!(
+            lexicographical_partition_points(&input).is_err(),
+            "lexicographical_partition_points should reject columns with different row counts"
+        );
+    }
+
+    #[test]
+    fn test_lexicographical_partition_single_column() -> Result<()> {
+        let input = vec![SortColumn {
+            values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
+                as ArrayRef,
+            options: Some(SortOptions {
+                descending: false,
+                nulls_first: true,
+            }),
+        }];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1, 8, 9], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(
+                vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
+                results
+            );
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_lexicographical_partition_all_equal_values() -> Result<()> {
+        let input = vec![SortColumn {
+            values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef,
+            options: Some(SortOptions {
+                descending: false,
+                nulls_first: true,
+            }),
+        }];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1000], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(vec![(0_usize..1000_usize)], results);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_lexicographical_partition_all_null_values() -> Result<()> {
+        let input = vec![
+            SortColumn {
+                values: new_null_array(&DataType::Int8, 1000),
+                options: Some(SortOptions {
+                    descending: false,
+                    nulls_first: true,
+                }),
+            },
+            SortColumn {
+                values: new_null_array(&DataType::UInt16, 1000),
+                options: Some(SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                }),
+            },
+        ];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1000], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(vec![(0_usize..1000_usize)], results);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_lexicographical_partition_unique_column_1() -> Result<()> {
+        let input = vec![
+            SortColumn {
+                values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
+                options: Some(SortOptions {
+                    descending: false,
+                    nulls_first: true,
+                }),
+            },
+            SortColumn {
+                values: Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]))
+                    as ArrayRef,
+                options: Some(SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                }),
+            },
+        ];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1, 2], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(vec![(0_usize..1_usize), (1_usize..2_usize)], results);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_lexicographical_partition_unique_column_2() -> Result<()> {
+        let input = vec![
+            SortColumn {
+                values: Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)]))
+                    as ArrayRef,
+                options: Some(SortOptions {
+                    descending: false,
+                    nulls_first: true,
+                }),
+            },
+            SortColumn {
+                values: Arc::new(StringArray::from(vec![
+                    Some("foo"),
+                    Some("bar"),
+                    Some("apple"),
+                ])) as ArrayRef,
+                options: Some(SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                }),
+            },
+        ];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1, 2, 3], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(
+                vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
+                results
+            );
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_lexicographical_partition_non_unique_column_1() -> Result<()> {
+        let input = vec![
+            SortColumn {
+                values: Arc::new(Int64Array::from(vec![
+                    None,
+                    Some(-1),
+                    Some(-1),
+                    Some(1),
+                ])) as ArrayRef,
+                options: Some(SortOptions {
+                    descending: false,
+                    nulls_first: true,
+                }),
+            },
+            SortColumn {
+                values: Arc::new(StringArray::from(vec![
+                    Some("foo"),
+                    Some("bar"),
+                    Some("bar"),
+                    Some("bar"),
+                ])) as ArrayRef,
+                options: Some(SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                }),
+            },
+        ];
+        {
+            let results = lexicographical_partition_points(&input)?;
+            assert_eq!(vec![0, 1, 3, 4], results);
+        }
+        {
+            let results = lexicographical_partition_ranges(&input)?;
+            assert_eq!(
+                vec![(0_usize..1_usize), (1_usize..3_usize), (3_usize..4_usize),],
+                results
+            );
+        }
+        Ok(())
+    }
+}
diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs
index 166f156..2b3b9a7 100644
--- a/arrow/src/compute/mod.rs
+++ b/arrow/src/compute/mod.rs
@@ -30,6 +30,7 @@ pub use self::kernels::comparison::*;
 pub use self::kernels::concat::*;
 pub use self::kernels::filter::*;
 pub use self::kernels::limit::*;
+pub use self::kernels::partition::*;
 pub use self::kernels::regexp::*;
 pub use self::kernels::sort::*;
 pub use self::kernels::take::*;