You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/02 11:59:49 UTC

[arrow-rs] branch master updated: Support List and LargeList in Row format (#3159) (#3251)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 983328852 Support List and LargeList in Row format (#3159) (#3251)
983328852 is described below

commit 9833288520c2e9ad353442170e3d2a8f27c6672d
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Dec 2 11:59:43 2022 +0000

    Support List and LargeList in Row format (#3159) (#3251)
    
    * Support List and LargeList in Row format (#3159)
    
    * Clippy
    
    * Update arrow/src/row/mod.rs
    
    Co-authored-by: Marco Neumann <ma...@crepererum.net>
    
    * Update arrow/src/row/list.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * More tests
    
    * Tweak docs
    
    Co-authored-by: Marco Neumann <ma...@crepererum.net>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 arrow/src/row/list.rs     | 178 +++++++++++++++++++++++
 arrow/src/row/mod.rs      | 350 +++++++++++++++++++++++++++++++++++++++++++++-
 arrow/src/row/variable.rs | 112 ++++++++-------
 3 files changed, 584 insertions(+), 56 deletions(-)

diff --git a/arrow/src/row/list.rs b/arrow/src/row/list.rs
new file mode 100644
index 000000000..e5ea5c2a0
--- /dev/null
+++ b/arrow/src/row/list.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compute::SortOptions;
+use crate::row::{RowConverter, Rows, SortField};
+use arrow_array::builder::BufferBuilder;
+use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
+use arrow_data::ArrayDataBuilder;
+use arrow_schema::ArrowError;
+use std::ops::Range;
+
+pub fn compute_lengths<O: OffsetSizeTrait>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &GenericListArray<O>,
+) {
+    let offsets = array.value_offsets().windows(2);
+    lengths
+        .iter_mut()
+        .zip(offsets)
+        .enumerate()
+        .for_each(|(idx, (length, offsets))| {
+            let start = offsets[0].as_usize();
+            let end = offsets[1].as_usize();
+            let range = array.is_valid(idx).then_some(start..end);
+            *length += encoded_len(rows, range);
+        });
+}
+
+fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
+    match range {
+        None => 1,
+        Some(range) if range.start == range.end => 1,
+        Some(range) => {
+            let element_count = range.end - range.start;
+            let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::<usize>();
+            let total = (1 + element_count) * std::mem::size_of::<u32>() + row_bytes;
+            super::variable::padded_length(Some(total))
+        }
+    }
+}
+
+/// Encodes the provided `GenericListArray` to `out` with the provided `SortOptions`
+///
+/// `rows` should contain the encoded child elements
+pub fn encode<O: OffsetSizeTrait>(
+    out: &mut Rows,
+    rows: &Rows,
+    opts: SortOptions,
+    array: &GenericListArray<O>,
+) {
+    let mut temporary = vec![];
+    let offsets = array.value_offsets().windows(2);
+    out.offsets
+        .iter_mut()
+        .skip(1)
+        .zip(offsets)
+        .enumerate()
+        .for_each(|(idx, (offset, offsets))| {
+            let start = offsets[0].as_usize();
+            let end = offsets[1].as_usize();
+            let range = array.is_valid(idx).then_some(start..end);
+            let out = &mut out.buffer[*offset..];
+            *offset += encode_one(out, &mut temporary, rows, range, opts)
+        });
+}
+
+#[inline]
+fn encode_one(
+    out: &mut [u8],
+    temporary: &mut Vec<u8>,
+    rows: &Rows,
+    range: Option<Range<usize>>,
+    opts: SortOptions,
+) -> usize {
+    temporary.clear();
+
+    match range {
+        None => super::variable::encode_one(out, None, opts),
+        Some(range) if range.start == range.end => {
+            super::variable::encode_one(out, Some(&[]), opts)
+        }
+        Some(range) => {
+            for row in range.clone().map(|i| rows.row(i)) {
+                temporary.extend_from_slice(row.as_ref());
+            }
+            for row in range.clone().map(|i| rows.row(i)) {
+                let len: u32 = row
+                    .as_ref()
+                    .len()
+                    .try_into()
+                    .expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported");
+                temporary.extend_from_slice(&len.to_be_bytes());
+            }
+            let row_count: u32 = (range.end - range.start)
+                .try_into()
+                .expect("lists containing more than u32::MAX elements not supported");
+            temporary.extend_from_slice(&row_count.to_be_bytes());
+            super::variable::encode_one(out, Some(temporary), opts)
+        }
+    }
+}
+
+/// Decodes a string array from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<O: OffsetSizeTrait>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<GenericListArray<O>, ArrowError> {
+    let canonical = super::variable::decode_binary::<i64>(rows, field.options);
+
+    let mut offsets = BufferBuilder::<O>::new(rows.len() + 1);
+    offsets.append(O::from_usize(0).unwrap());
+    let mut current_offset = 0;
+
+    let mut child_rows = Vec::with_capacity(rows.len());
+    canonical.value_offsets().windows(2).for_each(|w| {
+        let start = w[0] as usize;
+        let end = w[1] as usize;
+        if start == end {
+            // Null or empty list
+            offsets.append(O::from_usize(current_offset).unwrap());
+            return;
+        }
+
+        let row = &canonical.value_data()[start..end];
+        let element_count_start = row.len() - 4;
+        let element_count =
+            u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap())
+                as usize;
+
+        let lengths_start = element_count_start - (element_count * 4);
+        let mut row_offset = 0;
+        row[lengths_start..element_count_start]
+            .chunks_exact(4)
+            .for_each(|chunk| {
+                let len = u32::from_be_bytes(chunk.try_into().unwrap());
+                let next_row_offset = row_offset + len as usize;
+                child_rows.push(&row[row_offset..next_row_offset]);
+                row_offset = next_row_offset;
+            });
+
+        current_offset += element_count;
+        offsets.append(O::from_usize(current_offset).unwrap());
+    });
+
+    let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
+    assert_eq!(child.len(), 1);
+    let child_data = child[0].data().clone();
+
+    let builder = ArrayDataBuilder::new(field.data_type.clone())
+        .len(rows.len())
+        .null_count(canonical.null_count())
+        .null_bit_buffer(canonical.data().null_buffer().cloned())
+        .add_buffer(offsets.finish())
+        .add_child_data(child_data);
+
+    Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
+}
diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs
index cff49740f..abb8039cc 100644
--- a/arrow/src/row/mod.rs
+++ b/arrow/src/row/mod.rs
@@ -147,6 +147,7 @@ use crate::{downcast_dictionary_array, downcast_primitive_array};
 mod dictionary;
 mod fixed;
 mod interner;
+mod list;
 mod variable;
 
 /// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
@@ -343,6 +344,56 @@ mod variable;
 /// └───────┴───────────────┴───────┴─────────┴───────┘
 /// ```
 ///
+/// ## List Encoding
+///
+/// Lists are encoded by first encoding all child elements to the row format.
+///
+/// A "canonical byte array" is then constructed by concatenating the row
+/// encodings of all their elements into a single binary array, followed
+/// by the lengths of each encoded row, and the number of elements, encoded
+/// as big endian `u32`.
+///
+/// This canonical byte array is then encoded using the variable length byte
+/// encoding described above.
+///
+/// _The lengths are not strictly necessary but greatly simplify decode, they
+/// may be removed in a future iteration_.
+///
+/// For example given:
+///
+/// ```text
+/// [1_u8, 2_u8, 3_u8]
+/// [1_u8, null]
+/// []
+/// null
+/// ```
+///
+/// The elements would be converted to:
+///
+/// ```text
+///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
+///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
+///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
+///```
+///
+/// Which would be grouped into the following canonical byte arrays:
+///
+/// ```text
+///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+///  [1_u8, 2_u8, 3_u8]     │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│
+///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+///                          └──── rows ────┘   └───────── row lengths ─────────┘  └─ count ─┘
+///
+///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+///  [1_u8, null]           │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│
+///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+///```
+///
+/// With `[]` represented by an empty byte array, and `null` a null byte array.
+///
+/// These byte arrays will then be encoded using the variable length byte encoding
+/// described above.
+///
 /// # Ordering
 ///
 /// ## Float Ordering
@@ -381,6 +432,8 @@ enum Codec {
     /// A row converter for the child fields
     /// and the encoding of a row containing only nulls
     Struct(RowConverter, OwnedRow),
+    /// A row converter for the child field
+    List(RowConverter),
 }
 
 impl Codec {
@@ -388,6 +441,20 @@ impl Codec {
         match &sort_field.data_type {
             DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())),
             d if !d.is_nested() => Ok(Self::Stateless),
+            DataType::List(f) | DataType::LargeList(f) => {
+                // The encoded contents will be inverted if descending is set to true
+                // As such we set `descending` to false and negate nulls first if it
+                // it set to true
+                let options = SortOptions {
+                    descending: false,
+                    nulls_first: sort_field.options.nulls_first
+                        != sort_field.options.descending,
+                };
+
+                let field = SortField::new_with_options(f.data_type().clone(), options);
+                let converter = RowConverter::new(vec![field])?;
+                Ok(Self::List(converter))
+            }
             DataType::Struct(f) => {
                 let sort_fields = f
                     .iter()
@@ -441,6 +508,15 @@ impl Codec {
                 let rows = converter.convert_columns(v.columns())?;
                 Ok(Encoder::Struct(rows, null.row()))
             }
+            Codec::List(converter) => {
+                let values = match array.data_type() {
+                    DataType::List(_) => as_list_array(array).values(),
+                    DataType::LargeList(_) => as_large_list_array(array).values(),
+                    _ => unreachable!(),
+                };
+                let rows = converter.convert_columns(&[values])?;
+                Ok(Encoder::List(rows))
+            }
         }
     }
 
@@ -449,6 +525,7 @@ impl Codec {
             Codec::Stateless => 0,
             Codec::Dictionary(interner) => interner.size(),
             Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
+            Codec::List(converter) => converter.size(),
         }
     }
 }
@@ -459,12 +536,14 @@ enum Encoder<'a> {
     Stateless,
     /// The mapping from dictionary keys to normalized keys
     Dictionary(Vec<Option<&'a [u8]>>),
-    /// The row encoding of the child array and the encoding of a null row
+    /// The row encoding of the child arrays and the encoding of a null row
     ///
     /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
     /// values that are masked by a null in the parent StructArray, otherwise
     /// this would establish an ordering between semantically null values
     Struct(Rows, Row<'a>),
+    /// The row encoding of the child array
+    List(Rows),
 }
 
 /// Configure the data type and sort order for a given column
@@ -521,6 +600,9 @@ impl RowConverter {
     fn supports_datatype(d: &DataType) -> bool {
         match d {
             _ if !d.is_nested() => true,
+            DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
+                Self::supports_datatype(f.data_type())
+            }
             DataType::Struct(f) => {
                 f.iter().all(|x| Self::supports_datatype(x.data_type()))
             }
@@ -571,7 +653,7 @@ impl RowConverter {
             columns.iter().zip(self.fields.iter()).zip(encoders)
         {
             // We encode a column at a time to minimise dispatch overheads
-            encode_column(&mut rows, column, field.options, &encoder)
+            encode_column(&mut rows, column.as_ref(), field.options, &encoder)
         }
 
         if cfg!(debug_assertions) {
@@ -975,6 +1057,15 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
                     }
                 });
             }
+            Encoder::List(rows) => match array.data_type() {
+                DataType::List(_) => {
+                    list::compute_lengths(&mut lengths, rows, as_list_array(array))
+                }
+                DataType::LargeList(_) => {
+                    list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
+                }
+                _ => unreachable!(),
+            },
         }
     }
 
@@ -1014,7 +1105,7 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
 /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
 fn encode_column(
     out: &mut Rows,
-    column: &ArrayRef,
+    column: &dyn Array,
     opts: SortOptions,
     encoder: &Encoder<'_>,
 ) {
@@ -1056,7 +1147,7 @@ fn encode_column(
             }
         }
         Encoder::Struct(rows, null) => {
-            let array = as_struct_array(column.as_ref());
+            let array = as_struct_array(column);
             let null_sentinel = null_sentinel(opts);
             out.offsets
                 .iter_mut()
@@ -1073,6 +1164,13 @@ fn encode_column(
                     *offset = end_offset;
                 })
         }
+        Encoder::List(rows) => match column.data_type() {
+            DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
+            DataType::LargeList(_) => {
+                list::encode(out, rows, opts, as_large_list_array(column))
+            }
+            _ => unreachable!(),
+        },
     }
 }
 
@@ -1165,6 +1263,15 @@ unsafe fn decode_column(
 
             Arc::new(StructArray::from(builder.build_unchecked()))
         }
+        Codec::List(converter) => match &field.data_type {
+            DataType::List(_) => {
+                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
+            }
+            DataType::LargeList(_) => {
+                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
+            }
+            _ => unreachable!(),
+        },
     };
     Ok(array)
 }
@@ -1173,7 +1280,9 @@ unsafe fn decode_column(
 mod tests {
     use std::sync::Arc;
 
-    use arrow_array::builder::FixedSizeBinaryBuilder;
+    use arrow_array::builder::{
+        FixedSizeBinaryBuilder, GenericListBuilder, Int32Builder,
+    };
     use rand::distributions::uniform::SampleUniform;
     use rand::distributions::{Distribution, Standard};
     use rand::{thread_rng, Rng};
@@ -1542,6 +1651,24 @@ mod tests {
 
         let cols = converter.convert_rows(&rows_c).unwrap();
         assert_eq!(&cols[0], &a);
+
+        let mut converter = RowConverter::new(vec![SortField::new_with_options(
+            a.data_type().clone(),
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+        )])
+        .unwrap();
+
+        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
+        assert!(rows_c.row(3) < rows_c.row(5));
+        assert!(rows_c.row(2) > rows_c.row(1));
+        assert!(rows_c.row(0) > rows_c.row(1));
+        assert!(rows_c.row(3) < rows_c.row(0));
+
+        let cols = converter.convert_rows(&rows_c).unwrap();
+        assert_eq!(&cols[0], &a);
     }
 
     #[test]
@@ -1671,6 +1798,219 @@ mod tests {
         let _ = converter.convert_rows(&rows);
     }
 
+    fn test_single_list<O: OffsetSizeTrait>() {
+        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.values().append_value(32);
+        builder.append(true);
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.values().append_value(12);
+        builder.append(true);
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.append(true);
+        builder.values().append_value(32); // MASKED
+        builder.values().append_value(52); // MASKED
+        builder.append(false);
+        builder.values().append_value(32);
+        builder.values().append_null();
+        builder.append(true);
+        builder.append(true);
+
+        let list = Arc::new(builder.finish()) as ArrayRef;
+        let d = list.data_type().clone();
+
+        let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
+
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
+        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
+        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
+        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
+        assert!(rows.row(3) < rows.row(5)); // null < []
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+
+        let options = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let field = SortField::new_with_options(d.clone(), options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
+        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
+        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
+        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
+        assert!(rows.row(3) > rows.row(5)); // null > []
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+
+        let options = SortOptions {
+            descending: true,
+            nulls_first: false,
+        };
+        let field = SortField::new_with_options(d.clone(), options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
+        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
+        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
+        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
+        assert!(rows.row(3) > rows.row(5)); // null > []
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+
+        let options = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        let field = SortField::new_with_options(d, options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
+        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
+        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
+        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
+        assert!(rows.row(3) < rows.row(5)); // null < []
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+    }
+
+    fn test_nested_list<O: OffsetSizeTrait>() {
+        let mut builder = GenericListBuilder::<O, _>::new(
+            GenericListBuilder::<O, _>::new(Int32Builder::new()),
+        );
+
+        builder.values().values().append_value(1);
+        builder.values().values().append_value(2);
+        builder.values().append(true);
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.append(true);
+
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.append(true);
+
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.values().append(false);
+        builder.append(true);
+        builder.append(false);
+
+        builder.values().values().append_value(1);
+        builder.values().values().append_value(2);
+        builder.values().append(true);
+        builder.append(true);
+
+        let list = Arc::new(builder.finish()) as ArrayRef;
+        let d = list.data_type().clone();
+
+        // [
+        //   [[1, 2], [1, null]],
+        //   [[1, null], [1, null]],
+        //   [[1, null], null]
+        //   null
+        //   [[1, 2]]
+        // ]
+        let options = SortOptions {
+            descending: false,
+            nulls_first: true,
+        };
+        let field = SortField::new_with_options(d.clone(), options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1));
+        assert!(rows.row(1) > rows.row(2));
+        assert!(rows.row(2) > rows.row(3));
+        assert!(rows.row(4) < rows.row(0));
+        assert!(rows.row(4) > rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+
+        let options = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        let field = SortField::new_with_options(d.clone(), options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1));
+        assert!(rows.row(1) > rows.row(2));
+        assert!(rows.row(2) > rows.row(3));
+        assert!(rows.row(4) > rows.row(0));
+        assert!(rows.row(4) > rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+
+        let options = SortOptions {
+            descending: true,
+            nulls_first: false,
+        };
+        let field = SortField::new_with_options(d, options);
+        let mut converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1));
+        assert!(rows.row(1) < rows.row(2));
+        assert!(rows.row(2) < rows.row(3));
+        assert!(rows.row(4) > rows.row(0));
+        assert!(rows.row(4) < rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].data().validate_full().unwrap();
+        assert_eq!(&back[0], &list);
+    }
+
+    #[test]
+    fn test_list() {
+        test_single_list::<i32>();
+        test_nested_list::<i32>();
+    }
+
+    #[test]
+    fn test_large_list() {
+        test_single_list::<i64>();
+        test_nested_list::<i64>();
+    }
+
     fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
     where
         K: ArrowPrimitiveType,
diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs
index 3aa0b4839..9162f2312 100644
--- a/arrow/src/row/variable.rs
+++ b/arrow/src/row/variable.rs
@@ -37,9 +37,16 @@ pub const EMPTY_SENTINEL: u8 = 1;
 pub const NON_EMPTY_SENTINEL: u8 = 2;
 
 /// Returns the length of the encoded representation of a byte array, including the null byte
+#[inline]
 pub fn encoded_len(a: Option<&[u8]>) -> usize {
+    padded_length(a.map(|x| x.len()))
+}
+
+/// Returns the padded length of the encoded length of the given length
+#[inline]
+pub fn padded_length(a: Option<usize>) -> usize {
     match a {
-        Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1),
+        Some(a) => 1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
         None => 1,
     }
 }
@@ -61,59 +68,62 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
     opts: SortOptions,
 ) {
     for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
-        match maybe_val {
-            Some(val) if val.is_empty() => {
-                out.buffer[*offset] = match opts.descending {
-                    true => !EMPTY_SENTINEL,
-                    false => EMPTY_SENTINEL,
-                };
-                *offset += 1;
+        *offset += encode_one(&mut out.buffer[*offset..], maybe_val, opts);
+    }
+}
+
+pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
+    match val {
+        Some(val) if val.is_empty() => {
+            out[0] = match opts.descending {
+                true => !EMPTY_SENTINEL,
+                false => EMPTY_SENTINEL,
+            };
+            1
+        }
+        Some(val) => {
+            let block_count = ceil(val.len(), BLOCK_SIZE);
+            let end_offset = 1 + block_count * (BLOCK_SIZE + 1);
+            let to_write = &mut out[..end_offset];
+
+            // Write `2_u8` to demarcate as non-empty, non-null string
+            to_write[0] = NON_EMPTY_SENTINEL;
+
+            let chunks = val.chunks_exact(BLOCK_SIZE);
+            let remainder = chunks.remainder();
+            for (input, output) in chunks
+                .clone()
+                .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
+            {
+                let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
+                let out_block: &mut [u8; BLOCK_SIZE] =
+                    (&mut output[..BLOCK_SIZE]).try_into().unwrap();
+
+                *out_block = *input;
+
+                // Indicate that there are further blocks to follow
+                output[BLOCK_SIZE] = BLOCK_CONTINUATION;
             }
-            Some(val) => {
-                let block_count = ceil(val.len(), BLOCK_SIZE);
-                let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1);
-                let to_write = &mut out.buffer[*offset..end_offset];
-
-                // Write `2_u8` to demarcate as non-empty, non-null string
-                to_write[0] = NON_EMPTY_SENTINEL;
-
-                let chunks = val.chunks_exact(BLOCK_SIZE);
-                let remainder = chunks.remainder();
-                for (input, output) in chunks
-                    .clone()
-                    .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
-                {
-                    let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
-                    let out_block: &mut [u8; BLOCK_SIZE] =
-                        (&mut output[..BLOCK_SIZE]).try_into().unwrap();
-
-                    *out_block = *input;
-
-                    // Indicate that there are further blocks to follow
-                    output[BLOCK_SIZE] = BLOCK_CONTINUATION;
-                }
-
-                if !remainder.is_empty() {
-                    let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
-                    to_write[start_offset..start_offset + remainder.len()]
-                        .copy_from_slice(remainder);
-                    *to_write.last_mut().unwrap() = remainder.len() as u8;
-                } else {
-                    // We must overwrite the continuation marker written by the loop above
-                    *to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
-                }
-
-                *offset = end_offset;
-
-                if opts.descending {
-                    // Invert bits
-                    to_write.iter_mut().for_each(|v| *v = !*v)
-                }
+
+            if !remainder.is_empty() {
+                let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
+                to_write[start_offset..start_offset + remainder.len()]
+                    .copy_from_slice(remainder);
+                *to_write.last_mut().unwrap() = remainder.len() as u8;
+            } else {
+                // We must overwrite the continuation marker written by the loop above
+                *to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
             }
-            None => {
-                out.buffer[*offset] = null_sentinel(opts);
-                *offset += 1;
+
+            if opts.descending {
+                // Invert bits
+                to_write.iter_mut().for_each(|v| *v = !*v)
             }
+            end_offset
+        }
+        None => {
+            out[0] = null_sentinel(opts);
+            1
         }
     }
 }