You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/25 14:04:31 UTC
[arrow-rs] branch master updated: feat: Add `RunEndEncodedArray` (#3553)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f0be9da82 feat: Add `RunEndEncodedArray` (#3553)
f0be9da82 is described below
commit f0be9da82cbd76da3042b426daf6c424c9560d93
Author: askoa <11...@users.noreply.github.com>
AuthorDate: Wed Jan 25 09:04:25 2023 -0500
feat: Add `RunEndEncodedArray` (#3553)
* Add `RunEndEncodedArray`
* fix doctest and clippy issues
* fix doc issues
* fix doc issue
* add validation for run_ends array and corresponding tests
* PR comments
* seal ArrowRunEndIndexType per PR suggestion
* Fix PR suggestions
* few more PR coments
* run array name change
* fix doc issues
* doc change
* lint fix
* make append methods infallible
* fix array.len and other minor changes
* formatting fix
* add validation of array len
* fmt fix
* PR comment and some documentation changes
* pr suggestion
* empty commit
Co-authored-by: ask <as...@local>
---
arrow-array/src/array/mod.rs | 18 +
arrow-array/src/array/run_array.rs | 507 ++++++++++++++++++++
.../src/builder/generic_byte_run_builder.rs | 519 +++++++++++++++++++++
arrow-array/src/builder/mod.rs | 4 +
arrow-array/src/builder/primitive_run_builder.rs | 294 ++++++++++++
arrow-array/src/types.rs | 25 +
arrow-data/src/data.rs | 98 +++-
arrow-data/src/equal/mod.rs | 1 +
arrow-data/src/transform/mod.rs | 16 +
arrow-integration-test/src/datatype.rs | 1 +
arrow-ipc/src/convert.rs | 1 +
arrow-schema/src/datatype.rs | 23 +
arrow-schema/src/error.rs | 4 +
arrow-schema/src/field.rs | 1 +
parquet/src/arrow/arrow_writer/mod.rs | 2 +-
parquet/src/arrow/schema/mod.rs | 1 +
16 files changed, 1511 insertions(+), 4 deletions(-)
diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index 1e17e35d0..69f6ba4d8 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -64,6 +64,9 @@ pub use struct_array::*;
mod union_array;
pub use union_array::*;
+mod run_array;
+pub use run_array::*;
+
/// Trait for dealing with different types of array at runtime when the type of the
/// array is not known in advance.
pub trait Array: std::fmt::Debug + Send + Sync {
@@ -579,6 +582,20 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
}
dt => panic!("Unexpected dictionary key type {:?}", dt),
},
+ DataType::RunEndEncoded(ref run_ends_type, _) => {
+ match run_ends_type.data_type() {
+ DataType::Int16 => {
+ Arc::new(RunArray::<Int16Type>::from(data)) as ArrayRef
+ }
+ DataType::Int32 => {
+ Arc::new(RunArray::<Int32Type>::from(data)) as ArrayRef
+ }
+ DataType::Int64 => {
+ Arc::new(RunArray::<Int64Type>::from(data)) as ArrayRef
+ }
+ dt => panic!("Unexpected data type for run_ends array {:?}", dt),
+ }
+ }
DataType::Null => Arc::new(NullArray::from(data)) as ArrayRef,
DataType::Decimal128(_, _) => Arc::new(Decimal128Array::from(data)) as ArrayRef,
DataType::Decimal256(_, _) => Arc::new(Decimal256Array::from(data)) as ArrayRef,
@@ -737,6 +754,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
new_null_sized_decimal(data_type, length, std::mem::size_of::<i128>())
}
DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32),
+ DataType::RunEndEncoded(_, _) => todo!(),
}
}
diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs
new file mode 100644
index 000000000..0e39cd288
--- /dev/null
+++ b/arrow-array/src/array/run_array.rs
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+
+use arrow_buffer::ArrowNativeType;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType, Field};
+
+use crate::{
+ builder::StringRunBuilder,
+ make_array,
+ types::{Int16Type, Int32Type, Int64Type, RunEndIndexType},
+ Array, ArrayRef, PrimitiveArray,
+};
+
+///
+/// A run-end encoding (REE) is a variation of [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding).
+///
+/// This encoding is good for representing data containing same values repeated consecutively.
+///
+/// [`RunArray`] contains `run_ends` array and `values` array of same length.
+/// The `run_ends` array stores the indexes at which the run ends. The `values` array
+/// stores the value of each run. Below example illustrates how a logical array is represented in
+/// [`RunArray`]
+///
+///
+/// ```text
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐
+/// ┌─────────────────┐ ┌─────────┐ ┌─────────────────┐
+/// │ │ A │ │ 2 │ │ │ A │
+/// ├─────────────────┤ ├─────────┤ ├─────────────────┤
+/// │ │ D │ │ 3 │ │ │ A │ run length of 'A' = runs_ends[0] - 0 = 2
+/// ├─────────────────┤ ├─────────┤ ├─────────────────┤
+/// │ │ B │ │ 6 │ │ │ D │ run length of 'D' = run_ends[1] - run_ends[0] = 1
+/// └─────────────────┘ └─────────┘ ├─────────────────┤
+/// │ values run_ends │ │ B │
+/// ├─────────────────┤
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ │ B │
+/// ├─────────────────┤
+/// RunArray │ B │ run length of 'B' = run_ends[2] - run_ends[1] = 3
+/// length = 3 └─────────────────┘
+///
+/// Logical array
+/// Contents
+/// ```
+
+pub struct RunArray<R: RunEndIndexType> {
+ data: ArrayData,
+ run_ends: PrimitiveArray<R>,
+ values: ArrayRef,
+}
+
+impl<R: RunEndIndexType> RunArray<R> {
+ // calculates the logical length of the array encoded
+ // by the given run_ends array.
+ fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
+ let len = run_ends.len();
+ if len == 0 {
+ return 0;
+ }
+ run_ends.value(len - 1).as_usize()
+ }
+
+ /// Attempts to create RunArray using given run_ends (index where a run ends)
+ /// and the values (value of the run). Returns an error if the given data is not compatible
+ /// with RunEndEncoded specification.
+ pub fn try_new(
+ run_ends: &PrimitiveArray<R>,
+ values: &dyn Array,
+ ) -> Result<Self, ArrowError> {
+ let run_ends_type = run_ends.data_type().clone();
+ let values_type = values.data_type().clone();
+ let ree_array_type = DataType::RunEndEncoded(
+ Box::new(Field::new("run_ends", run_ends_type, false)),
+ Box::new(Field::new("values", values_type, true)),
+ );
+ let len = RunArray::logical_len(run_ends);
+ let builder = ArrayDataBuilder::new(ree_array_type)
+ .len(len)
+ .add_child_data(run_ends.data().clone())
+ .add_child_data(values.data().clone());
+
+ // `build_unchecked` is used to avoid recursive validation of child arrays.
+ let array_data = unsafe { builder.build_unchecked() };
+
+ // Safety: `validate_data` checks below
+ // 1. The given array data has exactly two child arrays.
+ // 2. The first child array (run_ends) has valid data type.
+ // 3. run_ends array does not have null values
+ // 4. run_ends array has non-zero and strictly increasing values.
+ // 5. The length of run_ends array and values array are the same.
+ array_data.validate_data()?;
+
+ Ok(array_data.into())
+ }
+
+ /// Returns a reference to run_ends array
+ ///
+ /// Note: any slicing of this array is not applied to the returned array
+ /// and must be handled separately
+ pub fn run_ends(&self) -> &PrimitiveArray<R> {
+ &self.run_ends
+ }
+
+ /// Returns a reference to values array
+ pub fn values(&self) -> &ArrayRef {
+ &self.values
+ }
+}
+
+impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
+ // The method assumes the caller already validated the data using `ArrayData::validate_data()`
+ fn from(data: ArrayData) -> Self {
+ match data.data_type() {
+ DataType::RunEndEncoded(_, _) => {}
+ _ => {
+ panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded");
+ }
+ }
+
+ let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
+ let values = make_array(data.child_data()[1].clone());
+ Self {
+ data,
+ run_ends,
+ values,
+ }
+ }
+}
+
+impl<R: RunEndIndexType> From<RunArray<R>> for ArrayData {
+ fn from(array: RunArray<R>) -> Self {
+ array.data
+ }
+}
+
+impl<T: RunEndIndexType> Array for RunArray<T> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn data(&self) -> &ArrayData {
+ &self.data
+ }
+
+ fn into_data(self) -> ArrayData {
+ self.into()
+ }
+}
+
+impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ writeln!(
+ f,
+ "RunArray {{run_ends: {:?}, values: {:?}}}",
+ self.run_ends, self.values
+ )
+ }
+}
+
+/// Constructs a `RunArray` from an iterator of optional strings.
+///
+/// # Example:
+/// ```
+/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
+///
+/// let test = vec!["a", "a", "b", "c", "c"];
+/// let array: RunArray<Int16Type> = test
+/// .iter()
+/// .map(|&x| if x == "b" { None } else { Some(x) })
+/// .collect();
+/// assert_eq!(
+/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
+/// format!("{:?}", array)
+/// );
+/// ```
+impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
+ fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
+ let it = iter.into_iter();
+ let (lower, _) = it.size_hint();
+ let mut builder = StringRunBuilder::with_capacity(lower, 256);
+ it.for_each(|i| {
+ builder.append_option(i);
+ });
+
+ builder.finish()
+ }
+}
+
+/// Constructs a `RunArray` from an iterator of strings.
+///
+/// # Example:
+///
+/// ```
+/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
+///
+/// let test = vec!["a", "a", "b", "c"];
+/// let array: RunArray<Int16Type> = test.into_iter().collect();
+/// assert_eq!(
+/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
+/// format!("{:?}", array)
+/// );
+/// ```
+impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
+ fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
+ let it = iter.into_iter();
+ let (lower, _) = it.size_hint();
+ let mut builder = StringRunBuilder::with_capacity(lower, 256);
+ it.for_each(|i| {
+ builder.append_value(i);
+ });
+
+ builder.finish()
+ }
+}
+
+///
+/// A [`RunArray`] array where run ends are stored using `i16` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int16RunArray, Int16Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int16RunArray = RunArray<Int16Type>;
+
+///
+/// A [`RunArray`] array where run ends are stored using `i32` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int32RunArray, Int32Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int32RunArray = RunArray<Int32Type>;
+
+///
+/// A [`RunArray`] array where run ends are stored using `i64` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int64RunArray, Int64Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int64RunArray = RunArray<Int64Type>;
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use super::*;
+ use crate::builder::PrimitiveRunBuilder;
+ use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
+ use crate::{Array, Int16Array, Int32Array, StringArray};
+
+ #[test]
+ fn test_run_array() {
+ // Construct a value array
+ let value_data = PrimitiveArray::<Int8Type>::from_iter_values([
+ 10_i8, 11, 12, 13, 14, 15, 16, 17,
+ ]);
+
+ // Construct a run_ends array:
+ let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
+ 4_i16, 6, 7, 9, 13, 18, 20, 22,
+ ]);
+
+ // Construct a run ends encoded array from the above two
+ let ree_array =
+ RunArray::<Int16Type>::try_new(&run_ends_data, &value_data).unwrap();
+
+ assert_eq!(ree_array.len(), 22);
+ assert_eq!(ree_array.null_count(), 0);
+
+ let values = ree_array.values();
+ assert_eq!(&value_data.into_data(), values.data());
+ assert_eq!(&DataType::Int8, values.data_type());
+
+ let run_ends = ree_array.run_ends();
+ assert_eq!(&run_ends_data.into_data(), run_ends.data());
+ assert_eq!(&DataType::Int16, run_ends.data_type());
+ }
+
+ #[test]
+ fn test_run_array_fmt_debug() {
+ let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(3);
+ builder.append_value(12345678);
+ builder.append_null();
+ builder.append_value(22345678);
+ let array = builder.finish();
+ assert_eq!(
+ "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
+ format!("{:?}", array)
+ );
+
+ let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(20);
+ for _ in 0..20 {
+ builder.append_value(1);
+ }
+ let array = builder.finish();
+
+ assert_eq!(array.len(), 20);
+ assert_eq!(array.null_count(), 0);
+
+ assert_eq!(
+ "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 20,\n], values: PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
+ format!("{:?}", array)
+ );
+ }
+
+ #[test]
+ fn test_run_array_from_iter() {
+ let test = vec!["a", "a", "b", "c"];
+ let array: RunArray<Int16Type> = test
+ .iter()
+ .map(|&x| if x == "b" { None } else { Some(x) })
+ .collect();
+ assert_eq!(
+ "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
+ format!("{:?}", array)
+ );
+
+ assert_eq!(array.len(), 4);
+ assert_eq!(array.null_count(), 0);
+
+ let array: RunArray<Int16Type> = test.into_iter().collect();
+ assert_eq!(
+ "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
+ format!("{:?}", array)
+ );
+ }
+
+ #[test]
+ fn test_run_array_run_ends_as_primitive_array() {
+ let test = vec!["a", "b", "c", "a"];
+ let array: RunArray<Int16Type> = test.into_iter().collect();
+
+ assert_eq!(array.len(), 4);
+ assert_eq!(array.null_count(), 0);
+
+ let run_ends = array.run_ends();
+ assert_eq!(&DataType::Int16, run_ends.data_type());
+ assert_eq!(0, run_ends.null_count());
+ assert_eq!(&[1, 2, 3, 4], run_ends.values());
+ }
+
+ #[test]
+ fn test_run_array_as_primitive_array_with_null() {
+ let test = vec![Some("a"), None, Some("b"), None, None, Some("a")];
+ let array: RunArray<Int32Type> = test.into_iter().collect();
+
+ assert_eq!(array.len(), 6);
+ assert_eq!(array.null_count(), 0);
+
+ let run_ends = array.run_ends();
+ assert_eq!(&DataType::Int32, run_ends.data_type());
+ assert_eq!(0, run_ends.null_count());
+ assert_eq!(5, run_ends.len());
+ assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
+
+ let values_data = array.values();
+ assert_eq!(2, values_data.null_count());
+ assert_eq!(5, values_data.len());
+ }
+
+ #[test]
+ fn test_run_array_all_nulls() {
+ let test = vec![None, None, None];
+ let array: RunArray<Int32Type> = test.into_iter().collect();
+
+ assert_eq!(array.len(), 3);
+ assert_eq!(array.null_count(), 0);
+
+ let run_ends = array.run_ends();
+ assert_eq!(1, run_ends.len());
+ assert_eq!(&[3], run_ends.values());
+
+ let values_data = array.values();
+ assert_eq!(1, values_data.null_count());
+ }
+
+ #[test]
+ fn test_run_array_try_new() {
+ let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
+ .into_iter()
+ .collect();
+ let run_ends: Int32Array =
+ [Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
+
+ let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
+ assert_eq!(array.run_ends().data_type(), &DataType::Int32);
+ assert_eq!(array.values().data_type(), &DataType::Utf8);
+
+ assert_eq!(array.null_count(), 0);
+ assert_eq!(array.len(), 4);
+ assert_eq!(array.run_ends.null_count(), 0);
+ assert_eq!(array.values().null_count(), 1);
+
+ assert_eq!(
+ "RunArray {run_ends: PrimitiveArray<Int32>\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
+ format!("{:?}", array)
+ );
+ }
+
+ #[test]
+ fn test_run_array_int16_type_definition() {
+ let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+ let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+ assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+ assert_eq!(array.values(), &values);
+ }
+
+ #[test]
+ fn test_run_array_empty_string() {
+ let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
+ let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
+ assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
+ assert_eq!(array.values(), &values);
+ }
+
+ #[test]
+ fn test_run_array_length_mismatch() {
+ let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
+ .into_iter()
+ .collect();
+ let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect();
+
+ let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+ let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string());
+ assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+ }
+
+ #[test]
+ fn test_run_array_run_ends_with_null() {
+ let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+ .into_iter()
+ .collect();
+ let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect();
+
+ let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+ let expected = ArrowError::InvalidArgumentError("Found null values in run_ends array. The run_ends array should not have null values.".to_string());
+ assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+ }
+
+ #[test]
+ fn test_run_array_run_ends_with_zeroes() {
+ let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+ .into_iter()
+ .collect();
+ let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect();
+
+ let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+ let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string());
+ assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+ }
+
+ #[test]
+ fn test_run_array_run_ends_non_increasing() {
+ let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+ .into_iter()
+ .collect();
+ let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect();
+
+ let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+ let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string());
+ assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "PrimitiveArray expected ArrayData with type Int64 got Int32"
+ )]
+ fn test_run_array_run_ends_data_type_mismatch() {
+ let a = RunArray::<Int32Type>::from_iter(["32"]);
+ let _ = RunArray::<Int64Type>::from(a.into_data());
+ }
+}
diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs
new file mode 100644
index 000000000..c1ecbcb5d
--- /dev/null
+++ b/arrow-array/src/builder/generic_byte_run_builder.rs
@@ -0,0 +1,519 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::types::bytes::ByteArrayNativeType;
+use std::{any::Any, sync::Arc};
+
+use crate::{
+ types::{
+ BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType,
+ Utf8Type,
+ },
+ ArrayRef, ArrowPrimitiveType, RunArray,
+};
+
+use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
+
+use arrow_buffer::ArrowNativeType;
+
+/// Array builder for [`RunArray`] for String and Binary types.
+///
+/// # Example:
+///
+/// ```
+///
+/// # use arrow_array::builder::GenericByteRunBuilder;
+/// # use arrow_array::{GenericByteArray, BinaryArray};
+/// # use arrow_array::types::{BinaryType, Int16Type};
+/// # use arrow_array::{Array, Int16Array};
+/// # use arrow_array::cast::as_generic_binary_array;
+///
+/// let mut builder =
+/// GenericByteRunBuilder::<Int16Type, BinaryType>::new();
+/// builder.append_value(b"abc");
+/// builder.append_value(b"abc");
+/// builder.append_null();
+/// builder.append_value(b"def");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+/// array.run_ends(),
+/// &Int16Array::from(vec![Some(2), Some(3), Some(4)])
+/// );
+///
+/// let av = array.values();
+///
+/// assert!(!av.is_null(0));
+/// assert!(av.is_null(1));
+/// assert!(!av.is_null(2));
+///
+/// // Values are polymorphic and so require a downcast.
+/// let ava: &BinaryArray = as_generic_binary_array(av.as_ref());
+///
+/// assert_eq!(ava.value(0), b"abc");
+/// assert_eq!(ava.value(2), b"def");
+/// ```
+#[derive(Debug)]
+pub struct GenericByteRunBuilder<R, V>
+where
+ R: ArrowPrimitiveType,
+ V: ByteArrayType,
+{
+ run_ends_builder: PrimitiveBuilder<R>,
+ values_builder: GenericByteBuilder<V>,
+ current_value: Vec<u8>,
+ has_current_value: bool,
+ current_run_end_index: usize,
+ prev_run_end_index: usize,
+}
+
+impl<R, V> Default for GenericByteRunBuilder<R, V>
+where
+ R: ArrowPrimitiveType,
+ V: ByteArrayType,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<R, V> GenericByteRunBuilder<R, V>
+where
+ R: ArrowPrimitiveType,
+ V: ByteArrayType,
+{
+ /// Creates a new `GenericByteRunBuilder`
+ pub fn new() -> Self {
+ Self {
+ run_ends_builder: PrimitiveBuilder::new(),
+ values_builder: GenericByteBuilder::<V>::new(),
+ current_value: Vec::new(),
+ has_current_value: false,
+ current_run_end_index: 0,
+ prev_run_end_index: 0,
+ }
+ }
+
+ /// Creates a new `GenericByteRunBuilder` with the provided capacity
+ ///
+ /// `capacity`: the expected number of run-end encoded values.
+ /// `data_capacity`: the expected number of bytes of run end encoded values
+ pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self {
+ Self {
+ run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
+ values_builder: GenericByteBuilder::<V>::with_capacity(
+ capacity,
+ data_capacity,
+ ),
+ current_value: Vec::new(),
+ has_current_value: false,
+ current_run_end_index: 0,
+ prev_run_end_index: 0,
+ }
+ }
+}
+
+impl<R, V> ArrayBuilder for GenericByteRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ByteArrayType,
+{
+ /// Returns the builder as a non-mutable `Any` reference.
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns the builder as a mutable `Any` reference.
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+
+ /// Returns the boxed builder as a box of `Any`.
+ fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+ self
+ }
+
+ /// Returns the length of logical array encoded by
+ /// the eventual runs array.
+ fn len(&self) -> usize {
+ self.current_run_end_index
+ }
+
+ /// Returns whether the number of array slots is zero
+ fn is_empty(&self) -> bool {
+ self.current_run_end_index == 0
+ }
+
+ /// Builds the array and reset this builder.
+ fn finish(&mut self) -> ArrayRef {
+ Arc::new(self.finish())
+ }
+
+ /// Builds the array without resetting the builder.
+ fn finish_cloned(&self) -> ArrayRef {
+ Arc::new(self.finish_cloned())
+ }
+}
+
+impl<R, V> GenericByteRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ByteArrayType,
+{
+ /// Appends optional value to the logical array encoded by the RunArray.
+ pub fn append_option(&mut self, input_value: Option<impl AsRef<V::Native>>) {
+ match input_value {
+ Some(value) => self.append_value(value),
+ None => self.append_null(),
+ }
+ }
+
+ /// Appends value to the logical array encoded by the RunArray.
+ pub fn append_value(&mut self, input_value: impl AsRef<V::Native>) {
+ let value: &[u8] = input_value.as_ref().as_ref();
+ if !self.has_current_value {
+ self.append_run_end();
+ self.current_value.extend_from_slice(value);
+ self.has_current_value = true;
+ } else if self.current_value.as_slice() != value {
+ self.append_run_end();
+ self.current_value.clear();
+ self.current_value.extend_from_slice(value);
+ }
+ self.current_run_end_index += 1;
+ }
+
+ /// Appends null to the logical array encoded by the RunArray.
+ pub fn append_null(&mut self) {
+ if self.has_current_value {
+ self.append_run_end();
+ self.current_value.clear();
+ self.has_current_value = false;
+ }
+ self.current_run_end_index += 1;
+ }
+
+ /// Creates the RunArray and resets the builder.
+ /// Panics if RunArray cannot be built.
+ pub fn finish(&mut self) -> RunArray<R> {
+ // write the last run end to the array.
+ self.append_run_end();
+
+ // reset the run end index to zero.
+ self.current_value.clear();
+ self.has_current_value = false;
+ self.current_run_end_index = 0;
+ self.prev_run_end_index = 0;
+
+ // build the run encoded array by adding run_ends and values array as its children.
+ let run_ends_array = self.run_ends_builder.finish();
+ let values_array = self.values_builder.finish();
+ RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+ }
+
+ /// Creates the RunArray and without resetting the builder.
+ /// Panics if RunArray cannot be built.
+ pub fn finish_cloned(&self) -> RunArray<R> {
+ let mut run_ends_array = self.run_ends_builder.finish_cloned();
+ let mut values_array = self.values_builder.finish_cloned();
+
+ // Add current run if one exists
+ if self.prev_run_end_index != self.current_run_end_index {
+ let mut run_end_builder = run_ends_array.into_builder().unwrap();
+ let mut values_builder = values_array.into_builder().unwrap();
+ self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
+ run_ends_array = run_end_builder.finish();
+ values_array = values_builder.finish();
+ }
+
+ RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+ }
+
+ // Appends the current run to the array.
+ fn append_run_end(&mut self) {
+ // empty array or the function called without appending any value.
+ if self.prev_run_end_index == self.current_run_end_index {
+ return;
+ }
+ let run_end_index = self.run_end_index_as_native();
+ self.run_ends_builder.append_value(run_end_index);
+ if self.has_current_value {
+ let slice = self.current_value.as_slice();
+ let native = unsafe {
+ // Safety:
+ // As self.current_value is created from V::Native. The value V::Native can be
+ // built back from the bytes without validations
+ V::Native::from_bytes_unchecked(slice)
+ };
+ self.values_builder.append_value(native);
+ } else {
+ self.values_builder.append_null();
+ }
+ self.prev_run_end_index = self.current_run_end_index;
+ }
+
+ // Similar to `append_run_end` but on custom builders.
+ // Used in `finish_cloned` which is not suppose to mutate `self`.
+ fn append_run_end_with_builders(
+ &self,
+ run_ends_builder: &mut PrimitiveBuilder<R>,
+ values_builder: &mut GenericByteBuilder<V>,
+ ) {
+ let run_end_index = self.run_end_index_as_native();
+ run_ends_builder.append_value(run_end_index);
+ if self.has_current_value {
+ let slice = self.current_value.as_slice();
+ let native = unsafe {
+ // Safety:
+ // As self.current_value is created from V::Native. The value V::Native can be
+ // built back from the bytes without validations
+ V::Native::from_bytes_unchecked(slice)
+ };
+ values_builder.append_value(native);
+ } else {
+ values_builder.append_null();
+ }
+ }
+
+ fn run_end_index_as_native(&self) -> R::Native {
+ R::Native::from_usize(self.current_run_end_index)
+ .unwrap_or_else(|| panic!(
+ "Cannot convert the value {} from `usize` to native form of arrow datatype {}",
+ self.current_run_end_index,
+ R::DATA_TYPE
+ ))
+ }
+}
+
+/// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]).
+///
+/// ```
+/// // Create a run-end encoded array with run-end indexes data type as `i16`.
+/// // The encoded values are Strings.
+///
+/// # use arrow_array::builder::StringRunBuilder;
+/// # use arrow_array::{Int16Array, StringArray};
+/// # use arrow_array::types::Int16Type;
+/// # use arrow_array::cast::as_string_array;
+///
+/// let mut builder = StringRunBuilder::<Int16Type>::new();
+///
+/// // The builder builds the dictionary value by value
+/// builder.append_value("abc");
+/// builder.append_null();
+/// builder.append_value("def");
+/// builder.append_value("def");
+/// builder.append_value("abc");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+/// array.run_ends(),
+/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+/// );
+///
+/// // Values are polymorphic and so require a downcast.
+/// let av = array.values();
+/// let ava: &StringArray = as_string_array(av.as_ref());
+///
+/// assert_eq!(ava.value(0), "abc");
+/// assert!(av.is_null(1));
+/// assert_eq!(ava.value(2), "def");
+/// assert_eq!(ava.value(3), "abc");
+///
+/// ```
+pub type StringRunBuilder<K> = GenericByteRunBuilder<K, Utf8Type>;
+
+/// Array builder for [`RunArray`] that encodes large strings ([`LargeUtf8Type`]). See [`StringRunBuilder`] for an example.
+pub type LargeStringRunBuilder<K> = GenericByteRunBuilder<K, LargeUtf8Type>;
+
+/// Array builder for [`RunArray`] that encodes binary values([`BinaryType`]).
+///
+/// ```
+/// // Create a run-end encoded array with run-end indexes data type as `i16`.
+/// // The encoded data is binary values.
+///
+/// # use arrow_array::builder::BinaryRunBuilder;
+/// # use arrow_array::{BinaryArray, Int16Array};
+/// # use arrow_array::types::Int16Type;
+/// # use arrow_array::cast::as_generic_binary_array;
+///
+/// let mut builder = BinaryRunBuilder::<Int16Type>::new();
+///
+/// // The builder builds the dictionary value by value
+/// builder.append_value(b"abc");
+/// builder.append_null();
+/// builder.append_value(b"def");
+/// builder.append_value(b"def");
+/// builder.append_value(b"abc");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+/// array.run_ends(),
+/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+/// );
+///
+/// // Values are polymorphic and so require a downcast.
+/// let av = array.values();
+/// let ava: &BinaryArray = as_generic_binary_array::<i32>(av.as_ref());
+///
+/// assert_eq!(ava.value(0), b"abc");
+/// assert!(av.is_null(1));
+/// assert_eq!(ava.value(2), b"def");
+/// assert_eq!(ava.value(3), b"abc");
+///
+/// ```
+pub type BinaryRunBuilder<K> = GenericByteRunBuilder<K, BinaryType>;
+
+/// Array builder for [`RunArray`] that encodes large binary values([`LargeBinaryType`]).
+/// See documentation of [`BinaryRunBuilder`] for an example.
+pub type LargeBinaryRunBuilder<K> = GenericByteRunBuilder<K, LargeBinaryType>;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::array::Array;
+ use crate::types::Int16Type;
+ use crate::GenericByteArray;
+ use crate::Int16Array;
+ use crate::Int16RunArray;
+
+ fn test_bytes_run_buider<T>(values: Vec<&T::Native>)
+ where
+ T: ByteArrayType,
+ <T as ByteArrayType>::Native: PartialEq,
+ <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
+ {
+ let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
+ builder.append_value(values[0]);
+ builder.append_value(values[0]);
+ builder.append_value(values[0]);
+ builder.append_null();
+ builder.append_null();
+ builder.append_value(values[1]);
+ builder.append_value(values[1]);
+ builder.append_value(values[2]);
+ builder.append_value(values[2]);
+ builder.append_value(values[2]);
+ builder.append_value(values[2]);
+ let array = builder.finish();
+
+ assert_eq!(array.len(), 11);
+ assert_eq!(array.null_count(), 0);
+
+ assert_eq!(
+ array.run_ends(),
+ &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)])
+ );
+
+ // Values are polymorphic and so require a downcast.
+ let av = array.values();
+ let ava: &GenericByteArray<T> =
+ av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+ assert_eq!(*ava.value(0), *values[0]);
+ assert!(ava.is_null(1));
+ assert_eq!(*ava.value(2), *values[1]);
+ assert_eq!(*ava.value(3), *values[2]);
+ }
+
+ #[test]
+ fn test_string_run_buider() {
+ test_bytes_run_buider::<Utf8Type>(vec!["abc", "def", "ghi"]);
+ }
+
+ #[test]
+ fn test_string_run_buider_with_empty_strings() {
+ test_bytes_run_buider::<Utf8Type>(vec!["abc", "", "ghi"]);
+ }
+
+ #[test]
+ fn test_binary_run_buider() {
+ test_bytes_run_buider::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
+ }
+
+ fn test_bytes_run_buider_finish_cloned<T>(values: Vec<&T::Native>)
+ where
+ T: ByteArrayType,
+ <T as ByteArrayType>::Native: PartialEq,
+ <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
+ {
+ let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
+
+ builder.append_value(values[0]);
+ builder.append_null();
+ builder.append_value(values[1]);
+ builder.append_value(values[1]);
+ builder.append_value(values[0]);
+ let mut array: Int16RunArray = builder.finish_cloned();
+
+ assert_eq!(array.len(), 5);
+ assert_eq!(array.null_count(), 0);
+
+ assert_eq!(
+ array.run_ends(),
+ &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+ );
+
+ // Values are polymorphic and so require a downcast.
+ let av = array.values();
+ let ava: &GenericByteArray<T> =
+ av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+ assert_eq!(ava.value(0), values[0]);
+ assert!(ava.is_null(1));
+ assert_eq!(ava.value(2), values[1]);
+ assert_eq!(ava.value(3), values[0]);
+
+ // Append last value before `finish_cloned` (`value[0]`) again and ensure it has only
+ // one entry in final output.
+ builder.append_value(values[0]);
+ builder.append_value(values[0]);
+ builder.append_value(values[1]);
+ array = builder.finish();
+
+ assert_eq!(array.len(), 8);
+ assert_eq!(array.null_count(), 0);
+
+ assert_eq!(
+ array.run_ends(),
+ &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), Some(8),])
+ );
+
+ // Values are polymorphic and so require a downcast.
+ let av2 = array.values();
+ let ava2: &GenericByteArray<T> =
+ av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+ assert_eq!(ava2.value(0), values[0]);
+ assert!(ava2.is_null(1));
+ assert_eq!(ava2.value(2), values[1]);
+ // The value appended before and after `finish_cloned` has only one entry.
+ assert_eq!(ava2.value(3), values[0]);
+ assert_eq!(ava2.value(4), values[1]);
+ }
+
+ #[test]
+ fn test_string_run_buider_finish_cloned() {
+ test_bytes_run_buider_finish_cloned::<Utf8Type>(vec!["abc", "def", "ghi"]);
+ }
+
+ #[test]
+ fn test_binary_run_buider_finish_cloned() {
+ test_bytes_run_buider_finish_cloned::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
+ }
+}
diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs
index 820ecd23b..fc2454635 100644
--- a/arrow-array/src/builder/mod.rs
+++ b/arrow-array/src/builder/mod.rs
@@ -39,10 +39,14 @@ mod primitive_builder;
pub use primitive_builder::*;
mod primitive_dictionary_builder;
pub use primitive_dictionary_builder::*;
+mod primitive_run_builder;
+pub use primitive_run_builder::*;
mod struct_builder;
pub use struct_builder::*;
mod generic_bytes_dictionary_builder;
pub use generic_bytes_dictionary_builder::*;
+mod generic_byte_run_builder;
+pub use generic_byte_run_builder::*;
mod union_builder;
pub use union_builder::*;
diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs
new file mode 100644
index 000000000..82c46abfa
--- /dev/null
+++ b/arrow-array/src/builder/primitive_run_builder.rs
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use crate::{types::RunEndIndexType, ArrayRef, ArrowPrimitiveType, RunArray};
+
+use super::{ArrayBuilder, PrimitiveBuilder};
+
+use arrow_buffer::ArrowNativeType;
+
+/// Array builder for [`RunArray`] that encodes primitive values.
+///
+/// # Example:
+///
+/// ```
+///
+/// # use arrow_array::builder::PrimitiveRunBuilder;
+/// # use arrow_array::cast::as_primitive_array;
+/// # use arrow_array::types::{UInt32Type, Int16Type};
+/// # use arrow_array::{Array, UInt32Array, Int16Array};
+///
+/// let mut builder =
+/// PrimitiveRunBuilder::<Int16Type, UInt32Type>::new();
+/// builder.append_value(1234);
+/// builder.append_value(1234);
+/// builder.append_value(1234);
+/// builder.append_null();
+/// builder.append_value(5678);
+/// builder.append_value(5678);
+/// let array = builder.finish();
+///
+/// assert_eq!(
+/// array.run_ends(),
+/// &Int16Array::from(vec![Some(3), Some(4), Some(6)])
+/// );
+///
+/// let av = array.values();
+///
+/// assert!(!av.is_null(0));
+/// assert!(av.is_null(1));
+/// assert!(!av.is_null(2));
+///
+/// // Values are polymorphic and so require a downcast.
+/// let ava: &UInt32Array = as_primitive_array::<UInt32Type>(av.as_ref());
+///
+/// assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)]));
+/// ```
+#[derive(Debug)]
+pub struct PrimitiveRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ArrowPrimitiveType,
+{
+ run_ends_builder: PrimitiveBuilder<R>,
+ values_builder: PrimitiveBuilder<V>,
+ current_value: Option<V::Native>,
+ current_run_end_index: usize,
+ prev_run_end_index: usize,
+}
+
+impl<R, V> Default for PrimitiveRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ArrowPrimitiveType,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<R, V> PrimitiveRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ArrowPrimitiveType,
+{
+ /// Creates a new `PrimitiveRunBuilder`
+ pub fn new() -> Self {
+ Self {
+ run_ends_builder: PrimitiveBuilder::new(),
+ values_builder: PrimitiveBuilder::new(),
+ current_value: None,
+ current_run_end_index: 0,
+ prev_run_end_index: 0,
+ }
+ }
+
+ /// Creates a new `PrimitiveRunBuilder` with the provided capacity
+ ///
+ /// `capacity`: the expected number of run-end encoded values.
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self {
+ run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
+ values_builder: PrimitiveBuilder::with_capacity(capacity),
+ current_value: None,
+ current_run_end_index: 0,
+ prev_run_end_index: 0,
+ }
+ }
+}
+
+impl<R, V> ArrayBuilder for PrimitiveRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ArrowPrimitiveType,
+{
+ /// Returns the builder as a non-mutable `Any` reference.
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns the builder as a mutable `Any` reference.
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+
+ /// Returns the boxed builder as a box of `Any`.
+ fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+ self
+ }
+
+ /// Returns the length of logical array encoded by
+ /// the eventual runs array.
+ fn len(&self) -> usize {
+ self.current_run_end_index
+ }
+
+ /// Returns whether the number of array slots is zero
+ fn is_empty(&self) -> bool {
+ self.current_run_end_index == 0
+ }
+
+ /// Builds the array and reset this builder.
+ fn finish(&mut self) -> ArrayRef {
+ Arc::new(self.finish())
+ }
+
+ /// Builds the array without resetting the builder.
+ fn finish_cloned(&self) -> ArrayRef {
+ Arc::new(self.finish_cloned())
+ }
+}
+
+impl<R, V> PrimitiveRunBuilder<R, V>
+where
+ R: RunEndIndexType,
+ V: ArrowPrimitiveType,
+{
+ /// Appends optional value to the logical array encoded by the RunArray.
+ pub fn append_option(&mut self, value: Option<V::Native>) {
+ if self.current_run_end_index == 0 {
+ self.current_run_end_index = 1;
+ self.current_value = value;
+ return;
+ }
+ if self.current_value != value {
+ self.append_run_end();
+ self.current_value = value;
+ }
+
+ self.current_run_end_index += 1;
+ }
+
+ /// Appends value to the logical array encoded by the run-ends array.
+ pub fn append_value(&mut self, value: V::Native) {
+ self.append_option(Some(value))
+ }
+
+ /// Appends null to the logical array encoded by the run-ends array.
+ pub fn append_null(&mut self) {
+ self.append_option(None)
+ }
+
+ /// Creates the RunArray and resets the builder.
+ /// Panics if RunArray cannot be built.
+ pub fn finish(&mut self) -> RunArray<R> {
+ // write the last run end to the array.
+ self.append_run_end();
+
+ // reset the run index to zero.
+ self.current_value = None;
+ self.current_run_end_index = 0;
+
+ // build the run encoded array by adding run_ends and values array as its children.
+ let run_ends_array = self.run_ends_builder.finish();
+ let values_array = self.values_builder.finish();
+ RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+ }
+
+ /// Creates the RunArray and without resetting the builder.
+ /// Panics if RunArray cannot be built.
+ pub fn finish_cloned(&self) -> RunArray<R> {
+ let mut run_ends_array = self.run_ends_builder.finish_cloned();
+ let mut values_array = self.values_builder.finish_cloned();
+
+ // Add current run if one exists
+ if self.prev_run_end_index != self.current_run_end_index {
+ let mut run_end_builder = run_ends_array.into_builder().unwrap();
+ let mut values_builder = values_array.into_builder().unwrap();
+ self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
+ run_ends_array = run_end_builder.finish();
+ values_array = values_builder.finish();
+ }
+
+ RunArray::try_new(&run_ends_array, &values_array).unwrap()
+ }
+
+ // Appends the current run to the array.
+ fn append_run_end(&mut self) {
+ // empty array or the function called without appending any value.
+ if self.prev_run_end_index == self.current_run_end_index {
+ return;
+ }
+ let run_end_index = self.run_end_index_as_native();
+ self.run_ends_builder.append_value(run_end_index);
+ self.values_builder.append_option(self.current_value);
+ self.prev_run_end_index = self.current_run_end_index;
+ }
+
+ // Similar to `append_run_end` but on custom builders.
+ // Used in `finish_cloned` which is not suppose to mutate `self`.
+ fn append_run_end_with_builders(
+ &self,
+ run_ends_builder: &mut PrimitiveBuilder<R>,
+ values_builder: &mut PrimitiveBuilder<V>,
+ ) {
+ let run_end_index = self.run_end_index_as_native();
+ run_ends_builder.append_value(run_end_index);
+ values_builder.append_option(self.current_value);
+ }
+
+ fn run_end_index_as_native(&self) -> R::Native {
+ R::Native::from_usize(self.current_run_end_index)
+ .unwrap_or_else(|| panic!(
+ "Cannot convert `current_run_end_index` {} from `usize` to native form of arrow datatype {}",
+ self.current_run_end_index,
+ R::DATA_TYPE
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::builder::PrimitiveRunBuilder;
+ use crate::cast::as_primitive_array;
+ use crate::types::{Int16Type, UInt32Type};
+ use crate::{Array, Int16Array, UInt32Array};
+
+ #[test]
+ fn test_primitive_ree_array_builder() {
+ let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::new();
+ builder.append_value(1234);
+ builder.append_value(1234);
+ builder.append_value(1234);
+ builder.append_null();
+ builder.append_value(5678);
+ builder.append_value(5678);
+
+ let array = builder.finish();
+
+ assert_eq!(array.null_count(), 0);
+ assert_eq!(array.len(), 6);
+
+ assert_eq!(
+ array.run_ends(),
+ &Int16Array::from(vec![Some(3), Some(4), Some(6)])
+ );
+
+ let av = array.values();
+
+ assert!(!av.is_null(0));
+ assert!(av.is_null(1));
+ assert!(!av.is_null(2));
+
+ // Values are polymorphic and so require a downcast.
+ let ava: &UInt32Array = as_primitive_array::<UInt32Type>(av.as_ref());
+
+ assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)]));
+ }
+}
diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs
index 7c41a469e..fc02c0e5a 100644
--- a/arrow-array/src/types.rs
+++ b/arrow-array/src/types.rs
@@ -240,6 +240,31 @@ impl ArrowDictionaryKeyType for UInt32Type {}
impl ArrowDictionaryKeyType for UInt64Type {}
+mod run {
+ use super::*;
+
+ pub trait RunEndTypeSealed {}
+
+ impl RunEndTypeSealed for Int16Type {}
+
+ impl RunEndTypeSealed for Int32Type {}
+
+ impl RunEndTypeSealed for Int64Type {}
+}
+
+/// A subtype of primitive type that is used as run-ends index
+/// in `RunArray`.
+/// See <https://arrow.apache.org/docs/format/Columnar.html>
+///
+/// Note: The implementation of this trait is sealed to avoid accidental misuse.
+pub trait RunEndIndexType: ArrowPrimitiveType + run::RunEndTypeSealed {}
+
+impl RunEndIndexType for Int16Type {}
+
+impl RunEndIndexType for Int32Type {}
+
+impl RunEndIndexType for Int64Type {}
+
/// A subtype of primitive type that represents temporal values.
pub trait ArrowTemporalType: ArrowPrimitiveType {}
diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs
index 258ee082d..07bbc6642 100644
--- a/arrow-data/src/data.rs
+++ b/arrow-data/src/data.rs
@@ -198,9 +198,9 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff
],
_ => unreachable!(),
},
- DataType::FixedSizeList(_, _) | DataType::Struct(_) => {
- [empty_buffer, MutableBuffer::new(0)]
- }
+ DataType::FixedSizeList(_, _)
+ | DataType::Struct(_)
+ | DataType::RunEndEncoded(_, _) => [empty_buffer, MutableBuffer::new(0)],
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => [
MutableBuffer::new(capacity * mem::size_of::<u8>()),
empty_buffer,
@@ -724,6 +724,12 @@ impl ArrayData {
DataType::Dictionary(_, data_type) => {
vec![Self::new_empty(data_type)]
}
+ DataType::RunEndEncoded(run_ends, values) => {
+ vec![
+ Self::new_empty(run_ends.data_type()),
+ Self::new_empty(values.data_type()),
+ ]
+ }
};
// Data was constructed correctly above
@@ -853,6 +859,19 @@ impl ArrayData {
)));
}
}
+ DataType::RunEndEncoded(run_ends_type, _) => {
+ if run_ends_type.is_nullable() {
+ return Err(ArrowError::InvalidArgumentError(
+ "The nullable should be set to false for the field defining run_ends array.".to_string()
+ ));
+ }
+ if !DataType::is_run_ends_type(run_ends_type.data_type()) {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "RunArray run_ends types must be Int16, Int32 or Int64, but was {}",
+ run_ends_type.data_type()
+ )));
+ }
+ }
_ => {}
};
@@ -998,6 +1017,25 @@ impl ArrayData {
}
Ok(())
}
+ DataType::RunEndEncoded(run_ends_field, values_field) => {
+ self.validate_num_child_data(2)?;
+ let run_ends_data =
+ self.get_valid_child_data(0, run_ends_field.data_type())?;
+ let values_data =
+ self.get_valid_child_data(1, values_field.data_type())?;
+ if run_ends_data.len != values_data.len {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "The run_ends array length should be the same as values array length. Run_ends array length is {}, values array length is {}",
+ run_ends_data.len, values_data.len
+ )));
+ }
+ if run_ends_data.null_count() > 0 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Found null values in run_ends array. The run_ends array should not have null values.".to_string(),
+ ));
+ }
+ Ok(())
+ }
DataType::Union(fields, _, mode) => {
self.validate_num_child_data(fields.len())?;
@@ -1286,6 +1324,15 @@ impl ArrayData {
_ => unreachable!(),
}
}
+ DataType::RunEndEncoded(run_ends, _values) => {
+ let run_ends_data = self.child_data()[0].clone();
+ match run_ends.data_type() {
+ DataType::Int16 => run_ends_data.check_run_ends::<i16>(self.len()),
+ DataType::Int32 => run_ends_data.check_run_ends::<i32>(self.len()),
+ DataType::Int64 => run_ends_data.check_run_ends::<i64>(self.len()),
+ _ => unreachable!(),
+ }
+ }
_ => {
// No extra validation check required for other types
Ok(())
@@ -1446,6 +1493,50 @@ impl ArrayData {
})
}
+ /// Validates that each value in run_ends array is positive and strictly increasing.
+ fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
+ where
+ T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
+ {
+ let values = self.typed_buffer::<T>(0, self.len())?;
+ let mut prev_value: i64 = 0_i64;
+ values.iter().enumerate().try_for_each(|(ix, &inp_value)| {
+ let value: i64 = inp_value.try_into().map_err(|_| {
+ ArrowError::InvalidArgumentError(format!(
+ "Value at position {} out of bounds: {} (can not convert to i64)",
+ ix, inp_value
+ ))
+ })?;
+ if value <= 0_i64 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "The values in run_ends array should be strictly positive. Found value {} at index {} that does not match the criteria.",
+ value,
+ ix
+ )));
+ }
+ if ix > 0 && value <= prev_value {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "The values in run_ends array should be strictly increasing. Found value {} at index {} with previous value {} that does not match the criteria.",
+ value,
+ ix,
+ prev_value
+ )));
+ }
+
+ prev_value = value;
+ Ok(())
+ })?;
+
+ if prev_value.as_usize() != array_len {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "The length of array does not match the last value in the run_ends array. The last value of run_ends array is {} and length of array is {}.",
+ prev_value,
+ array_len
+ )));
+ }
+ Ok(())
+ }
+
/// Returns true if this `ArrayData` is equal to `other`, using pointer comparisons
/// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
/// return false when the arrays are logically equal
@@ -1542,6 +1633,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data
DataType::LargeList(_) => DataTypeLayout::new_fixed_width(size_of::<i64>()),
DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data,
+ DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data,
DataType::Union(_, _, mode) => {
let type_ids = BufferSpec::FixedWidth {
byte_width: size_of::<i8>(),
diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs
index 85c595cfe..aff61e3d3 100644
--- a/arrow-data/src/equal/mod.rs
+++ b/arrow-data/src/equal/mod.rs
@@ -137,6 +137,7 @@ fn equal_values(
},
DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start, rhs_start, len),
DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, len),
+ DataType::RunEndEncoded(_, _) => todo!(),
}
}
diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs
index 6a8c89d25..2a24b1cc2 100644
--- a/arrow-data/src/transform/mod.rs
+++ b/arrow-data/src/transform/mod.rs
@@ -230,6 +230,7 @@ fn build_extend(array: &ArrayData) -> Extend {
UnionMode::Sparse => union::build_extend_sparse(array),
UnionMode::Dense => union::build_extend_dense(array),
},
+ DataType::RunEndEncoded(_, _) => todo!(),
}
}
@@ -281,6 +282,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
UnionMode::Sparse => union::extend_nulls_sparse,
UnionMode::Dense => union::extend_nulls_dense,
},
+ DataType::RunEndEncoded(_, _) => todo!(),
})
}
@@ -473,6 +475,20 @@ impl<'a> MutableArrayData<'a> {
})
.collect::<Vec<_>>(),
},
+ DataType::RunEndEncoded(_, _) => {
+ let run_ends_child = arrays
+ .iter()
+ .map(|array| &array.child_data()[0])
+ .collect::<Vec<_>>();
+ let value_child = arrays
+ .iter()
+ .map(|array| &array.child_data()[1])
+ .collect::<Vec<_>>();
+ vec![
+ MutableArrayData::new(run_ends_child, false, array_capacity),
+ MutableArrayData::new(value_child, use_nulls, array_capacity),
+ ]
+ }
DataType::FixedSizeList(_, _) => {
let childs = arrays
.iter()
diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs
index dd0b95b0a..c2e326b4f 100644
--- a/arrow-integration-test/src/datatype.rs
+++ b/arrow-integration-test/src/datatype.rs
@@ -357,6 +357,7 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value {
DataType::Map(_, keys_sorted) => {
json!({"name": "map", "keysSorted": keys_sorted})
}
+ DataType::RunEndEncoded(_, _) => todo!(),
}
}
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index a60a19b86..305bb943c 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -711,6 +711,7 @@ pub(crate) fn get_fb_field_type<'a>(
children: Some(fbb.create_vector(&children[..])),
}
}
+ RunEndEncoded(_, _) => todo!(),
Map(map_field, keys_sorted) => {
let child = build_field(fbb, map_field);
let mut field_type = crate::MapBuilder::new(fbb);
diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs
index da1c20ddb..1e5c1321c 100644
--- a/arrow-schema/src/datatype.rs
+++ b/arrow-schema/src/datatype.rs
@@ -242,6 +242,18 @@ pub enum DataType {
/// child fields may be respectively "entries", "key", and "value", but this is
/// not enforced.
Map(Box<Field>, bool),
+ /// A run-end encoding (REE) is a variation of run-length encoding (RLE). These
+ /// encodings are well-suited for representing data containing sequences of the
+ /// same value, called runs. Each run is represented as a value and an integer giving
+ /// the index in the array where the run ends.
+ ///
+ /// A run-end encoded array has no buffers by itself, but has two child arrays. The
+ /// first child array, called the run ends array, holds either 16, 32, or 64-bit
+ /// signed integers. The actual values of each run are held in the second child array.
+ ///
+ /// These child arrays are prescribed the standard names of "run_ends" and "values"
+ /// respectively.
+ RunEndEncoded(Box<Field>, Box<Field>),
}
/// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds.
@@ -346,6 +358,13 @@ impl DataType {
)
}
+ /// Returns true if this type is valid for run-ends array in RunArray
+ #[inline]
+ pub fn is_run_ends_type(&self) -> bool {
+ use DataType::*;
+ matches!(self, Int16 | Int32 | Int64)
+ }
+
/// Returns true if this type is nested (List, FixedSizeList, LargeList, Struct, Union,
/// or Map), or a dictionary of a nested type
pub fn is_nested(&self) -> bool {
@@ -438,6 +457,10 @@ impl DataType {
+ (std::mem::size_of::<Field>() * fields.capacity())
}
DataType::Dictionary(dt1, dt2) => dt1.size() + dt2.size(),
+ DataType::RunEndEncoded(run_ends, values) => {
+ run_ends.size() - std::mem::size_of_val(run_ends) + values.size()
+ - std::mem::size_of_val(values)
+ }
}
}
}
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index ea60572b3..6213af8bc 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -41,6 +41,7 @@ pub enum ArrowError {
/// Error during import or export to/from the C Data Interface
CDataInterface(String),
DictionaryKeyOverflowError,
+ RunEndIndexOverflowError,
}
impl ArrowError {
@@ -96,6 +97,9 @@ impl Display for ArrowError {
ArrowError::DictionaryKeyOverflowError => {
write!(f, "Dictionary key bigger than the key type")
}
+ ArrowError::RunEndIndexOverflowError => {
+ write!(f, "Run end encoded array index overflow error")
+ }
}
}
}
diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs
index a3275dcb3..dc3ab3d62 100644
--- a/arrow-schema/src/field.rs
+++ b/arrow-schema/src/field.rs
@@ -410,6 +410,7 @@ impl Field {
| DataType::List(_)
| DataType::Map(_, _)
| DataType::Dictionary(_, _)
+ | DataType::RunEndEncoded(_, _)
| DataType::FixedSizeList(_, _)
| DataType::FixedSizeBinary(_)
| DataType::Utf8
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index 311981593..c459d40d7 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -360,7 +360,7 @@ fn write_leaves<W: Write>(
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
)),
- ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) => {
+ ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) | ArrowDataType::RunEndEncoded(_, _) => {
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 2ca4b7ef8..d81d6a69b 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -507,6 +507,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let dict_field = Field::new(name, *value.clone(), field.is_nullable());
arrow_to_parquet_type(&dict_field)
}
+ DataType::RunEndEncoded(_, _) => Err(arrow_err!("Converting RunEndEncodedType to parquet not supported",))
}
}