You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/02 11:59:49 UTC
[arrow-rs] branch master updated: Support List and LargeList in Row format (#3159) (#3251)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 983328852 Support List and LargeList in Row format (#3159) (#3251)
983328852 is described below
commit 9833288520c2e9ad353442170e3d2a8f27c6672d
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Dec 2 11:59:43 2022 +0000
Support List and LargeList in Row format (#3159) (#3251)
* Support List and LargeList in Row format (#3159)
* Clippy
* Update arrow/src/row/mod.rs
Co-authored-by: Marco Neumann <ma...@crepererum.net>
* Update arrow/src/row/list.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* More tests
* Tweak docs
Co-authored-by: Marco Neumann <ma...@crepererum.net>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
arrow/src/row/list.rs | 178 +++++++++++++++++++++++
arrow/src/row/mod.rs | 350 +++++++++++++++++++++++++++++++++++++++++++++-
arrow/src/row/variable.rs | 112 ++++++++-------
3 files changed, 584 insertions(+), 56 deletions(-)
diff --git a/arrow/src/row/list.rs b/arrow/src/row/list.rs
new file mode 100644
index 000000000..e5ea5c2a0
--- /dev/null
+++ b/arrow/src/row/list.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compute::SortOptions;
+use crate::row::{RowConverter, Rows, SortField};
+use arrow_array::builder::BufferBuilder;
+use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
+use arrow_data::ArrayDataBuilder;
+use arrow_schema::ArrowError;
+use std::ops::Range;
+
+pub fn compute_lengths<O: OffsetSizeTrait>(
+ lengths: &mut [usize],
+ rows: &Rows,
+ array: &GenericListArray<O>,
+) {
+ let offsets = array.value_offsets().windows(2);
+ lengths
+ .iter_mut()
+ .zip(offsets)
+ .enumerate()
+ .for_each(|(idx, (length, offsets))| {
+ let start = offsets[0].as_usize();
+ let end = offsets[1].as_usize();
+ let range = array.is_valid(idx).then_some(start..end);
+ *length += encoded_len(rows, range);
+ });
+}
+
+fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
+ match range {
+ None => 1,
+ Some(range) if range.start == range.end => 1,
+ Some(range) => {
+ let element_count = range.end - range.start;
+ let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::<usize>();
+ let total = (1 + element_count) * std::mem::size_of::<u32>() + row_bytes;
+ super::variable::padded_length(Some(total))
+ }
+ }
+}
+
+/// Encodes the provided `GenericListArray` to `out` with the provided `SortOptions`
+///
+/// `rows` should contain the encoded child elements
+pub fn encode<O: OffsetSizeTrait>(
+ out: &mut Rows,
+ rows: &Rows,
+ opts: SortOptions,
+ array: &GenericListArray<O>,
+) {
+ let mut temporary = vec![];
+ let offsets = array.value_offsets().windows(2);
+ out.offsets
+ .iter_mut()
+ .skip(1)
+ .zip(offsets)
+ .enumerate()
+ .for_each(|(idx, (offset, offsets))| {
+ let start = offsets[0].as_usize();
+ let end = offsets[1].as_usize();
+ let range = array.is_valid(idx).then_some(start..end);
+ let out = &mut out.buffer[*offset..];
+ *offset += encode_one(out, &mut temporary, rows, range, opts)
+ });
+}
+
+#[inline]
+fn encode_one(
+ out: &mut [u8],
+ temporary: &mut Vec<u8>,
+ rows: &Rows,
+ range: Option<Range<usize>>,
+ opts: SortOptions,
+) -> usize {
+ temporary.clear();
+
+ match range {
+ None => super::variable::encode_one(out, None, opts),
+ Some(range) if range.start == range.end => {
+ super::variable::encode_one(out, Some(&[]), opts)
+ }
+ Some(range) => {
+ for row in range.clone().map(|i| rows.row(i)) {
+ temporary.extend_from_slice(row.as_ref());
+ }
+ for row in range.clone().map(|i| rows.row(i)) {
+ let len: u32 = row
+ .as_ref()
+ .len()
+ .try_into()
+ .expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported");
+ temporary.extend_from_slice(&len.to_be_bytes());
+ }
+ let row_count: u32 = (range.end - range.start)
+ .try_into()
+ .expect("lists containing more than u32::MAX elements not supported");
+ temporary.extend_from_slice(&row_count.to_be_bytes());
+ super::variable::encode_one(out, Some(temporary), opts)
+ }
+ }
+}
+
+/// Decodes a string array from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<O: OffsetSizeTrait>(
+ converter: &RowConverter,
+ rows: &mut [&[u8]],
+ field: &SortField,
+ validate_utf8: bool,
+) -> Result<GenericListArray<O>, ArrowError> {
+ let canonical = super::variable::decode_binary::<i64>(rows, field.options);
+
+ let mut offsets = BufferBuilder::<O>::new(rows.len() + 1);
+ offsets.append(O::from_usize(0).unwrap());
+ let mut current_offset = 0;
+
+ let mut child_rows = Vec::with_capacity(rows.len());
+ canonical.value_offsets().windows(2).for_each(|w| {
+ let start = w[0] as usize;
+ let end = w[1] as usize;
+ if start == end {
+ // Null or empty list
+ offsets.append(O::from_usize(current_offset).unwrap());
+ return;
+ }
+
+ let row = &canonical.value_data()[start..end];
+ let element_count_start = row.len() - 4;
+ let element_count =
+ u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap())
+ as usize;
+
+ let lengths_start = element_count_start - (element_count * 4);
+ let mut row_offset = 0;
+ row[lengths_start..element_count_start]
+ .chunks_exact(4)
+ .for_each(|chunk| {
+ let len = u32::from_be_bytes(chunk.try_into().unwrap());
+ let next_row_offset = row_offset + len as usize;
+ child_rows.push(&row[row_offset..next_row_offset]);
+ row_offset = next_row_offset;
+ });
+
+ current_offset += element_count;
+ offsets.append(O::from_usize(current_offset).unwrap());
+ });
+
+ let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
+ assert_eq!(child.len(), 1);
+ let child_data = child[0].data().clone();
+
+ let builder = ArrayDataBuilder::new(field.data_type.clone())
+ .len(rows.len())
+ .null_count(canonical.null_count())
+ .null_bit_buffer(canonical.data().null_buffer().cloned())
+ .add_buffer(offsets.finish())
+ .add_child_data(child_data);
+
+ Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
+}
diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs
index cff49740f..abb8039cc 100644
--- a/arrow/src/row/mod.rs
+++ b/arrow/src/row/mod.rs
@@ -147,6 +147,7 @@ use crate::{downcast_dictionary_array, downcast_primitive_array};
mod dictionary;
mod fixed;
mod interner;
+mod list;
mod variable;
/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
@@ -343,6 +344,56 @@ mod variable;
/// └───────┴───────────────┴───────┴─────────┴───────┘
/// ```
///
+/// ## List Encoding
+///
+/// Lists are encoded by first encoding all child elements to the row format.
+///
+/// A "canonical byte array" is then constructed by concatenating the row
+/// encodings of all their elements into a single binary array, followed
+/// by the lengths of each encoded row, and the number of elements, encoded
+/// as big endian `u32`.
+///
+/// This canonical byte array is then encoded using the variable length byte
+/// encoding described above.
+///
+/// _The lengths are not strictly necessary but greatly simplify decode, they
+/// may be removed in a future iteration_.
+///
+/// For example given:
+///
+/// ```text
+/// [1_u8, 2_u8, 3_u8]
+/// [1_u8, null]
+/// []
+/// null
+/// ```
+///
+/// The elements would be converted to:
+///
+/// ```text
+/// ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐
+/// 1 │01│01│ 2 │01│02│ 3 │01│03│ 1 │01│01│ null │00│00│
+/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘
+///```
+///
+/// Which would be grouped into the following canonical byte arrays:
+///
+/// ```text
+/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│
+/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘
+///
+/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│
+/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+///```
+///
+/// With `[]` represented by an empty byte array, and `null` a null byte array.
+///
+/// These byte arrays will then be encoded using the variable length byte encoding
+/// described above.
+///
/// # Ordering
///
/// ## Float Ordering
@@ -381,6 +432,8 @@ enum Codec {
/// A row converter for the child fields
/// and the encoding of a row containing only nulls
Struct(RowConverter, OwnedRow),
+ /// A row converter for the child field
+ List(RowConverter),
}
impl Codec {
@@ -388,6 +441,20 @@ impl Codec {
match &sort_field.data_type {
DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())),
d if !d.is_nested() => Ok(Self::Stateless),
+ DataType::List(f) | DataType::LargeList(f) => {
+ // The encoded contents will be inverted if descending is set to true
+ // As such we set `descending` to false and negate nulls first if it
+ // it set to true
+ let options = SortOptions {
+ descending: false,
+ nulls_first: sort_field.options.nulls_first
+ != sort_field.options.descending,
+ };
+
+ let field = SortField::new_with_options(f.data_type().clone(), options);
+ let converter = RowConverter::new(vec![field])?;
+ Ok(Self::List(converter))
+ }
DataType::Struct(f) => {
let sort_fields = f
.iter()
@@ -441,6 +508,15 @@ impl Codec {
let rows = converter.convert_columns(v.columns())?;
Ok(Encoder::Struct(rows, null.row()))
}
+ Codec::List(converter) => {
+ let values = match array.data_type() {
+ DataType::List(_) => as_list_array(array).values(),
+ DataType::LargeList(_) => as_large_list_array(array).values(),
+ _ => unreachable!(),
+ };
+ let rows = converter.convert_columns(&[values])?;
+ Ok(Encoder::List(rows))
+ }
}
}
@@ -449,6 +525,7 @@ impl Codec {
Codec::Stateless => 0,
Codec::Dictionary(interner) => interner.size(),
Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
+ Codec::List(converter) => converter.size(),
}
}
}
@@ -459,12 +536,14 @@ enum Encoder<'a> {
Stateless,
/// The mapping from dictionary keys to normalized keys
Dictionary(Vec<Option<&'a [u8]>>),
- /// The row encoding of the child array and the encoding of a null row
+ /// The row encoding of the child arrays and the encoding of a null row
///
/// It is necessary to encode to a temporary [`Rows`] to avoid serializing
/// values that are masked by a null in the parent StructArray, otherwise
/// this would establish an ordering between semantically null values
Struct(Rows, Row<'a>),
+ /// The row encoding of the child array
+ List(Rows),
}
/// Configure the data type and sort order for a given column
@@ -521,6 +600,9 @@ impl RowConverter {
fn supports_datatype(d: &DataType) -> bool {
match d {
_ if !d.is_nested() => true,
+ DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
+ Self::supports_datatype(f.data_type())
+ }
DataType::Struct(f) => {
f.iter().all(|x| Self::supports_datatype(x.data_type()))
}
@@ -571,7 +653,7 @@ impl RowConverter {
columns.iter().zip(self.fields.iter()).zip(encoders)
{
// We encode a column at a time to minimise dispatch overheads
- encode_column(&mut rows, column, field.options, &encoder)
+ encode_column(&mut rows, column.as_ref(), field.options, &encoder)
}
if cfg!(debug_assertions) {
@@ -975,6 +1057,15 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
}
});
}
+ Encoder::List(rows) => match array.data_type() {
+ DataType::List(_) => {
+ list::compute_lengths(&mut lengths, rows, as_list_array(array))
+ }
+ DataType::LargeList(_) => {
+ list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
+ }
+ _ => unreachable!(),
+ },
}
}
@@ -1014,7 +1105,7 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
fn encode_column(
out: &mut Rows,
- column: &ArrayRef,
+ column: &dyn Array,
opts: SortOptions,
encoder: &Encoder<'_>,
) {
@@ -1056,7 +1147,7 @@ fn encode_column(
}
}
Encoder::Struct(rows, null) => {
- let array = as_struct_array(column.as_ref());
+ let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
out.offsets
.iter_mut()
@@ -1073,6 +1164,13 @@ fn encode_column(
*offset = end_offset;
})
}
+ Encoder::List(rows) => match column.data_type() {
+ DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
+ DataType::LargeList(_) => {
+ list::encode(out, rows, opts, as_large_list_array(column))
+ }
+ _ => unreachable!(),
+ },
}
}
@@ -1165,6 +1263,15 @@ unsafe fn decode_column(
Arc::new(StructArray::from(builder.build_unchecked()))
}
+ Codec::List(converter) => match &field.data_type {
+ DataType::List(_) => {
+ Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
+ }
+ DataType::LargeList(_) => {
+ Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
+ }
+ _ => unreachable!(),
+ },
};
Ok(array)
}
@@ -1173,7 +1280,9 @@ unsafe fn decode_column(
mod tests {
use std::sync::Arc;
- use arrow_array::builder::FixedSizeBinaryBuilder;
+ use arrow_array::builder::{
+ FixedSizeBinaryBuilder, GenericListBuilder, Int32Builder,
+ };
use rand::distributions::uniform::SampleUniform;
use rand::distributions::{Distribution, Standard};
use rand::{thread_rng, Rng};
@@ -1542,6 +1651,24 @@ mod tests {
let cols = converter.convert_rows(&rows_c).unwrap();
assert_eq!(&cols[0], &a);
+
+ let mut converter = RowConverter::new(vec![SortField::new_with_options(
+ a.data_type().clone(),
+ SortOptions {
+ descending: true,
+ nulls_first: true,
+ },
+ )])
+ .unwrap();
+
+ let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
+ assert!(rows_c.row(3) < rows_c.row(5));
+ assert!(rows_c.row(2) > rows_c.row(1));
+ assert!(rows_c.row(0) > rows_c.row(1));
+ assert!(rows_c.row(3) < rows_c.row(0));
+
+ let cols = converter.convert_rows(&rows_c).unwrap();
+ assert_eq!(&cols[0], &a);
}
#[test]
@@ -1671,6 +1798,219 @@ mod tests {
let _ = converter.convert_rows(&rows);
}
+ fn test_single_list<O: OffsetSizeTrait>() {
+ let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
+ builder.values().append_value(32);
+ builder.values().append_value(52);
+ builder.values().append_value(32);
+ builder.append(true);
+ builder.values().append_value(32);
+ builder.values().append_value(52);
+ builder.values().append_value(12);
+ builder.append(true);
+ builder.values().append_value(32);
+ builder.values().append_value(52);
+ builder.append(true);
+ builder.values().append_value(32); // MASKED
+ builder.values().append_value(52); // MASKED
+ builder.append(false);
+ builder.values().append_value(32);
+ builder.values().append_null();
+ builder.append(true);
+ builder.append(true);
+
+ let list = Arc::new(builder.finish()) as ArrayRef;
+ let d = list.data_type().clone();
+
+ let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
+
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+ assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+ assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
+ assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
+ assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
+ assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
+ assert!(rows.row(3) < rows.row(5)); // null < []
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let field = SortField::new_with_options(d.clone(), options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+ assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
+ assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
+ assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
+ assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
+ assert!(rows.row(3) > rows.row(5)); // null > []
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+
+ let options = SortOptions {
+ descending: true,
+ nulls_first: false,
+ };
+ let field = SortField::new_with_options(d.clone(), options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+ assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
+ assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
+ assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
+ assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
+ assert!(rows.row(3) > rows.row(5)); // null > []
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+
+ let options = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let field = SortField::new_with_options(d, options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+ assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
+ assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
+ assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
+ assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
+ assert!(rows.row(3) < rows.row(5)); // null < []
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+ }
+
+ fn test_nested_list<O: OffsetSizeTrait>() {
+ let mut builder = GenericListBuilder::<O, _>::new(
+ GenericListBuilder::<O, _>::new(Int32Builder::new()),
+ );
+
+ builder.values().values().append_value(1);
+ builder.values().values().append_value(2);
+ builder.values().append(true);
+ builder.values().values().append_value(1);
+ builder.values().values().append_null();
+ builder.values().append(true);
+ builder.append(true);
+
+ builder.values().values().append_value(1);
+ builder.values().values().append_null();
+ builder.values().append(true);
+ builder.values().values().append_value(1);
+ builder.values().values().append_null();
+ builder.values().append(true);
+ builder.append(true);
+
+ builder.values().values().append_value(1);
+ builder.values().values().append_null();
+ builder.values().append(true);
+ builder.values().append(false);
+ builder.append(true);
+ builder.append(false);
+
+ builder.values().values().append_value(1);
+ builder.values().values().append_value(2);
+ builder.values().append(true);
+ builder.append(true);
+
+ let list = Arc::new(builder.finish()) as ArrayRef;
+ let d = list.data_type().clone();
+
+ // [
+ // [[1, 2], [1, null]],
+ // [[1, null], [1, null]],
+ // [[1, null], null]
+ // null
+ // [[1, 2]]
+ // ]
+ let options = SortOptions {
+ descending: false,
+ nulls_first: true,
+ };
+ let field = SortField::new_with_options(d.clone(), options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) > rows.row(1));
+ assert!(rows.row(1) > rows.row(2));
+ assert!(rows.row(2) > rows.row(3));
+ assert!(rows.row(4) < rows.row(0));
+ assert!(rows.row(4) > rows.row(1));
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+
+ let options = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let field = SortField::new_with_options(d.clone(), options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) > rows.row(1));
+ assert!(rows.row(1) > rows.row(2));
+ assert!(rows.row(2) > rows.row(3));
+ assert!(rows.row(4) > rows.row(0));
+ assert!(rows.row(4) > rows.row(1));
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+
+ let options = SortOptions {
+ descending: true,
+ nulls_first: false,
+ };
+ let field = SortField::new_with_options(d, options);
+ let mut converter = RowConverter::new(vec![field]).unwrap();
+ let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+ assert!(rows.row(0) < rows.row(1));
+ assert!(rows.row(1) < rows.row(2));
+ assert!(rows.row(2) < rows.row(3));
+ assert!(rows.row(4) > rows.row(0));
+ assert!(rows.row(4) < rows.row(1));
+
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(back.len(), 1);
+ back[0].data().validate_full().unwrap();
+ assert_eq!(&back[0], &list);
+ }
+
+ #[test]
+ fn test_list() {
+ test_single_list::<i32>();
+ test_nested_list::<i32>();
+ }
+
+ #[test]
+ fn test_large_list() {
+ test_single_list::<i64>();
+ test_nested_list::<i64>();
+ }
+
fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
where
K: ArrowPrimitiveType,
diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs
index 3aa0b4839..9162f2312 100644
--- a/arrow/src/row/variable.rs
+++ b/arrow/src/row/variable.rs
@@ -37,9 +37,16 @@ pub const EMPTY_SENTINEL: u8 = 1;
pub const NON_EMPTY_SENTINEL: u8 = 2;
/// Returns the length of the encoded representation of a byte array, including the null byte
+#[inline]
pub fn encoded_len(a: Option<&[u8]>) -> usize {
+ padded_length(a.map(|x| x.len()))
+}
+
+/// Returns the padded length of the encoded length of the given length
+#[inline]
+pub fn padded_length(a: Option<usize>) -> usize {
match a {
- Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1),
+ Some(a) => 1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
None => 1,
}
}
@@ -61,59 +68,62 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
opts: SortOptions,
) {
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
- match maybe_val {
- Some(val) if val.is_empty() => {
- out.buffer[*offset] = match opts.descending {
- true => !EMPTY_SENTINEL,
- false => EMPTY_SENTINEL,
- };
- *offset += 1;
+ *offset += encode_one(&mut out.buffer[*offset..], maybe_val, opts);
+ }
+}
+
+pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
+ match val {
+ Some(val) if val.is_empty() => {
+ out[0] = match opts.descending {
+ true => !EMPTY_SENTINEL,
+ false => EMPTY_SENTINEL,
+ };
+ 1
+ }
+ Some(val) => {
+ let block_count = ceil(val.len(), BLOCK_SIZE);
+ let end_offset = 1 + block_count * (BLOCK_SIZE + 1);
+ let to_write = &mut out[..end_offset];
+
+ // Write `2_u8` to demarcate as non-empty, non-null string
+ to_write[0] = NON_EMPTY_SENTINEL;
+
+ let chunks = val.chunks_exact(BLOCK_SIZE);
+ let remainder = chunks.remainder();
+ for (input, output) in chunks
+ .clone()
+ .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
+ {
+ let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
+ let out_block: &mut [u8; BLOCK_SIZE] =
+ (&mut output[..BLOCK_SIZE]).try_into().unwrap();
+
+ *out_block = *input;
+
+ // Indicate that there are further blocks to follow
+ output[BLOCK_SIZE] = BLOCK_CONTINUATION;
}
- Some(val) => {
- let block_count = ceil(val.len(), BLOCK_SIZE);
- let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1);
- let to_write = &mut out.buffer[*offset..end_offset];
-
- // Write `2_u8` to demarcate as non-empty, non-null string
- to_write[0] = NON_EMPTY_SENTINEL;
-
- let chunks = val.chunks_exact(BLOCK_SIZE);
- let remainder = chunks.remainder();
- for (input, output) in chunks
- .clone()
- .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
- {
- let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
- let out_block: &mut [u8; BLOCK_SIZE] =
- (&mut output[..BLOCK_SIZE]).try_into().unwrap();
-
- *out_block = *input;
-
- // Indicate that there are further blocks to follow
- output[BLOCK_SIZE] = BLOCK_CONTINUATION;
- }
-
- if !remainder.is_empty() {
- let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
- to_write[start_offset..start_offset + remainder.len()]
- .copy_from_slice(remainder);
- *to_write.last_mut().unwrap() = remainder.len() as u8;
- } else {
- // We must overwrite the continuation marker written by the loop above
- *to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
- }
-
- *offset = end_offset;
-
- if opts.descending {
- // Invert bits
- to_write.iter_mut().for_each(|v| *v = !*v)
- }
+
+ if !remainder.is_empty() {
+ let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
+ to_write[start_offset..start_offset + remainder.len()]
+ .copy_from_slice(remainder);
+ *to_write.last_mut().unwrap() = remainder.len() as u8;
+ } else {
+ // We must overwrite the continuation marker written by the loop above
+ *to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
}
- None => {
- out.buffer[*offset] = null_sentinel(opts);
- *offset += 1;
+
+ if opts.descending {
+ // Invert bits
+ to_write.iter_mut().for_each(|v| *v = !*v)
}
+ end_offset
+ }
+ None => {
+ out[0] = null_sentinel(opts);
+ 1
}
}
}