You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/10 14:58:46 UTC
[arrow-rs] branch active_release updated: refactor lexico sort
(#424) (#441)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/active_release by this push:
new a7656a8 refactor lexico sort (#424) (#441)
a7656a8 is described below
commit a7656a8a3cd1f02e4543e1b971842ca92404f82a
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jun 10 10:58:36 2021 -0400
refactor lexico sort (#424) (#441)
Co-authored-by: Jiayu Liu <Ji...@users.noreply.github.com>
---
arrow/Cargo.toml | 4 +
arrow/benches/partition_kernels.rs | 142 +++++++++++++++
arrow/benches/sort_kernel.rs | 4 +-
arrow/src/compute/kernels/mod.rs | 1 +
arrow/src/compute/kernels/partition.rs | 314 +++++++++++++++++++++++++++++++++
arrow/src/compute/mod.rs | 1 +
6 files changed, 464 insertions(+), 2 deletions(-)
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 610f0f0..96ca6b8 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -124,6 +124,10 @@ name = "sort_kernel"
harness = false
[[bench]]
+name = "partition_kernels"
+harness = false
+
+[[bench]]
name = "csv_writer"
harness = false
diff --git a/arrow/benches/partition_kernels.rs b/arrow/benches/partition_kernels.rs
new file mode 100644
index 0000000..6a9ce70
--- /dev/null
+++ b/arrow/benches/partition_kernels.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+use criterion::Criterion;
+use std::sync::Arc;
+extern crate arrow;
+use arrow::compute::kernels::partition::lexicographical_partition_ranges;
+use arrow::compute::kernels::sort::{lexsort, SortColumn};
+use arrow::util::bench_util::*;
+use arrow::{
+ array::*,
+ datatypes::{ArrowPrimitiveType, Float64Type, UInt8Type},
+};
+use rand::distributions::{Distribution, Standard};
+use std::iter;
+
+fn create_array<T: ArrowPrimitiveType>(size: usize, with_nulls: bool) -> ArrayRef
+where
+ Standard: Distribution<T::Native>,
+{
+ let null_density = if with_nulls { 0.5 } else { 0.0 };
+ let array = create_primitive_array::<T>(size, null_density);
+ Arc::new(array)
+}
+
+fn bench_partition(sorted_columns: &[ArrayRef]) {
+ let columns = sorted_columns
+ .iter()
+ .map(|arr| SortColumn {
+ values: arr.clone(),
+ options: None,
+ })
+ .collect::<Vec<_>>();
+
+ criterion::black_box(lexicographical_partition_ranges(&columns).unwrap());
+}
+
+fn create_sorted_low_cardinality_data(length: usize) -> Vec<ArrayRef> {
+ let arr = Int64Array::from_iter_values(
+ iter::repeat(1)
+ .take(length / 4)
+ .chain(iter::repeat(2).take(length / 4))
+ .chain(iter::repeat(3).take(length / 4))
+ .chain(iter::repeat(4).take(length / 4)),
+ );
+ lexsort(
+ &[SortColumn {
+ values: Arc::new(arr),
+ options: None,
+ }],
+ None,
+ )
+ .unwrap()
+}
+
+fn create_sorted_float_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
+ lexsort(
+ &[
+ SortColumn {
+ values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
+ options: None,
+ },
+ SortColumn {
+ values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
+ options: None,
+ },
+ ],
+ None,
+ )
+ .unwrap()
+}
+
+fn create_sorted_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
+ lexsort(
+ &[
+ SortColumn {
+ values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
+ options: None,
+ },
+ SortColumn {
+ values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
+ options: None,
+ },
+ ],
+ None,
+ )
+ .unwrap()
+}
+
+fn add_benchmark(c: &mut Criterion) {
+ let sorted_columns = create_sorted_data(10, false);
+ c.bench_function("lexicographical_partition_ranges(u8) 2^10", |b| {
+ b.iter(|| bench_partition(&sorted_columns))
+ });
+
+ let sorted_columns = create_sorted_data(12, false);
+ c.bench_function("lexicographical_partition_ranges(u8) 2^12", |b| {
+ b.iter(|| bench_partition(&sorted_columns))
+ });
+
+ let sorted_columns = create_sorted_data(10, true);
+ c.bench_function(
+ "lexicographical_partition_ranges(u8) 2^10 with nulls",
+ |b| b.iter(|| bench_partition(&sorted_columns)),
+ );
+
+ let sorted_columns = create_sorted_data(12, true);
+ c.bench_function(
+ "lexicographical_partition_ranges(u8) 2^12 with nulls",
+ |b| b.iter(|| bench_partition(&sorted_columns)),
+ );
+
+ let sorted_columns = create_sorted_float_data(10, false);
+ c.bench_function("lexicographical_partition_ranges(f64) 2^10", |b| {
+ b.iter(|| bench_partition(&sorted_columns))
+ });
+
+ let sorted_columns = create_sorted_low_cardinality_data(1024);
+ c.bench_function(
+ "lexicographical_partition_ranges(low cardinality) 1024",
+ |b| b.iter(|| bench_partition(&sorted_columns)),
+ );
+}
+
+criterion_group!(benches, add_benchmark);
+criterion_main!(benches);
diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs
index 74dc0ce..8467b50 100644
--- a/arrow/benches/sort_kernel.rs
+++ b/arrow/benches/sort_kernel.rs
@@ -33,10 +33,10 @@ fn create_array(size: usize, with_nulls: bool) -> ArrayRef {
Arc::new(array)
}
-fn bench_sort(arr_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
+fn bench_sort(array_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
let columns = vec![
SortColumn {
- values: arr_a.clone(),
+ values: array_a.clone(),
options: None,
},
SortColumn {
diff --git a/arrow/src/compute/kernels/mod.rs b/arrow/src/compute/kernels/mod.rs
index 862f55f..a0ef50a 100644
--- a/arrow/src/compute/kernels/mod.rs
+++ b/arrow/src/compute/kernels/mod.rs
@@ -28,6 +28,7 @@ pub mod concat;
pub mod filter;
pub mod length;
pub mod limit;
+pub mod partition;
pub mod regexp;
pub mod sort;
pub mod substring;
diff --git a/arrow/src/compute/kernels/partition.rs b/arrow/src/compute/kernels/partition.rs
new file mode 100644
index 0000000..e91f80b
--- /dev/null
+++ b/arrow/src/compute/kernels/partition.rs
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines partition kernel for `ArrayRef`
+
+use crate::compute::kernels::sort::LexicographicalComparator;
+use crate::compute::SortColumn;
+use crate::error::{ArrowError, Result};
+use std::cmp::Ordering;
+use std::ops::Range;
+
+/// Given a list of already sorted columns, find partition ranges that would partition
+/// lexicographically equal values across columns.
+///
+/// Here LexicographicalComparator is used in conjunction with binary
+/// search so the columns *MUST* be pre-sorted already.
+///
+/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive
+/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last
+/// range.
+pub fn lexicographical_partition_ranges(
+ columns: &[SortColumn],
+) -> Result<Vec<Range<usize>>> {
+ let partition_points = lexicographical_partition_points(columns)?;
+ Ok(partition_points
+ .iter()
+ .zip(partition_points[1..].iter())
+ .map(|(&start, &end)| Range { start, end })
+ .collect())
+}
+
+/// Given a list of already sorted columns, find partition ranges that would partition
+/// lexicographically equal values across columns.
+///
+/// Here LexicographicalComparator is used in conjunction with binary
+/// search so the columns *MUST* be pre-sorted already.
+///
+/// The returned vec would be of size k+1 where k is cardinality of the sorted values; the first and
+/// last value would be 0 and n.
+fn lexicographical_partition_points(columns: &[SortColumn]) -> Result<Vec<usize>> {
+ if columns.is_empty() {
+ return Err(ArrowError::InvalidArgumentError(
+ "Sort requires at least one column".to_string(),
+ ));
+ }
+ let row_count = columns[0].values.len();
+ if columns.iter().any(|item| item.values.len() != row_count) {
+ return Err(ArrowError::ComputeError(
+ "Lexical sort columns have different row counts".to_string(),
+ ));
+ };
+
+ let mut result = vec![];
+ if row_count == 0 {
+ return Ok(result);
+ }
+
+ let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
+ let value_indices = (0..row_count).collect::<Vec<usize>>();
+
+ let mut previous_partition_point = 0;
+ result.push(previous_partition_point);
+ while previous_partition_point < row_count {
+ // invariant:
+ // value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
+ // so in order to save time we can do binary search on the value_indices[previous_partition_point..]
+ // and find when any value is greater than value_indices[previous_partition_point]; because we are using
+ // new indices, the new offset is _added_ to the previous_partition_point.
+ //
+ // be careful that idx is of type &usize which points to the actual value within value_indices, which itself
+ // contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
+ // original columnar data.
+ previous_partition_point += value_indices[previous_partition_point..]
+ .partition_point(|idx| {
+ lexicographical_comparator.compare(idx, &previous_partition_point)
+ != Ordering::Greater
+ });
+ result.push(previous_partition_point);
+ }
+
+ Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::array::*;
+ use crate::compute::SortOptions;
+ use crate::datatypes::DataType;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_lexicographical_partition_points_empty() {
+ let input = vec![];
+ assert!(
+ lexicographical_partition_points(&input).is_err(),
+ "lexicographical_partition_points should reject columns with empty rows"
+ );
+ }
+
+ #[test]
+ fn test_lexicographical_partition_points_unaligned_rows() {
+ let input = vec![
+ SortColumn {
+ values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
+ options: None,
+ },
+ SortColumn {
+ values: Arc::new(StringArray::from(vec![Some("foo")])) as ArrayRef,
+ options: None,
+ },
+ ];
+ assert!(
+ lexicographical_partition_points(&input).is_err(),
+ "lexicographical_partition_points should reject columns with different row counts"
+ );
+ }
+
+ #[test]
+ fn test_lexicographical_partition_single_column() -> Result<()> {
+ let input = vec![SortColumn {
+ values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
+ as ArrayRef,
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ }];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1, 8, 9], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(
+ vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
+ results
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_lexicographical_partition_all_equal_values() -> Result<()> {
+ let input = vec![SortColumn {
+ values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef,
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ }];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1000], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(vec![(0_usize..1000_usize)], results);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_lexicographical_partition_all_null_values() -> Result<()> {
+ let input = vec![
+ SortColumn {
+ values: new_null_array(&DataType::Int8, 1000),
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ },
+ SortColumn {
+ values: new_null_array(&DataType::UInt16, 1000),
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ }),
+ },
+ ];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1000], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(vec![(0_usize..1000_usize)], results);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_lexicographical_partition_unique_column_1() -> Result<()> {
+ let input = vec![
+ SortColumn {
+ values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ },
+ SortColumn {
+ values: Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]))
+ as ArrayRef,
+ options: Some(SortOptions {
+ descending: true,
+ nulls_first: true,
+ }),
+ },
+ ];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1, 2], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(vec![(0_usize..1_usize), (1_usize..2_usize)], results);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_lexicographical_partition_unique_column_2() -> Result<()> {
+ let input = vec![
+ SortColumn {
+ values: Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)]))
+ as ArrayRef,
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ },
+ SortColumn {
+ values: Arc::new(StringArray::from(vec![
+ Some("foo"),
+ Some("bar"),
+ Some("apple"),
+ ])) as ArrayRef,
+ options: Some(SortOptions {
+ descending: true,
+ nulls_first: true,
+ }),
+ },
+ ];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1, 2, 3], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(
+ vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
+ results
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_lexicographical_partition_non_unique_column_1() -> Result<()> {
+ let input = vec![
+ SortColumn {
+ values: Arc::new(Int64Array::from(vec![
+ None,
+ Some(-1),
+ Some(-1),
+ Some(1),
+ ])) as ArrayRef,
+ options: Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ },
+ SortColumn {
+ values: Arc::new(StringArray::from(vec![
+ Some("foo"),
+ Some("bar"),
+ Some("bar"),
+ Some("bar"),
+ ])) as ArrayRef,
+ options: Some(SortOptions {
+ descending: true,
+ nulls_first: true,
+ }),
+ },
+ ];
+ {
+ let results = lexicographical_partition_points(&input)?;
+ assert_eq!(vec![0, 1, 3, 4], results);
+ }
+ {
+ let results = lexicographical_partition_ranges(&input)?;
+ assert_eq!(
+ vec![(0_usize..1_usize), (1_usize..3_usize), (3_usize..4_usize),],
+ results
+ );
+ }
+ Ok(())
+ }
+}
diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs
index 166f156..2b3b9a7 100644
--- a/arrow/src/compute/mod.rs
+++ b/arrow/src/compute/mod.rs
@@ -30,6 +30,7 @@ pub use self::kernels::comparison::*;
pub use self::kernels::concat::*;
pub use self::kernels::filter::*;
pub use self::kernels::limit::*;
+pub use self::kernels::partition::*;
pub use self::kernels::regexp::*;
pub use self::kernels::sort::*;
pub use self::kernels::take::*;