You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/25 14:04:31 UTC

[arrow-rs] branch master updated: feat: Add `RunEndEncodedArray` (#3553)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f0be9da82 feat: Add `RunEndEncodedArray` (#3553)
f0be9da82 is described below

commit f0be9da82cbd76da3042b426daf6c424c9560d93
Author: askoa <11...@users.noreply.github.com>
AuthorDate: Wed Jan 25 09:04:25 2023 -0500

    feat: Add `RunEndEncodedArray` (#3553)
    
    * Add `RunEndEncodedArray`
    
    * fix doctest and clippy issues
    
    * fix doc issues
    
    * fix doc issue
    
    * add validation for run_ends array and corresponding tests
    
    * PR comments
    
    * seal ArrowRunEndIndexType per PR suggestion
    
    * Fix PR suggestions
    
    * few more PR coments
    
    * run array name change
    
    * fix doc issues
    
    * doc change
    
    * lint fix
    
    * make append methods infallible
    
    * fix array.len and other minor changes
    
    * formatting fix
    
    * add validation of array len
    
    * fmt fix
    
    * PR comment and some documentation changes
    
    * pr suggestion
    
    * empty commit
    
    Co-authored-by: ask <as...@local>
---
 arrow-array/src/array/mod.rs                       |  18 +
 arrow-array/src/array/run_array.rs                 | 507 ++++++++++++++++++++
 .../src/builder/generic_byte_run_builder.rs        | 519 +++++++++++++++++++++
 arrow-array/src/builder/mod.rs                     |   4 +
 arrow-array/src/builder/primitive_run_builder.rs   | 294 ++++++++++++
 arrow-array/src/types.rs                           |  25 +
 arrow-data/src/data.rs                             |  98 +++-
 arrow-data/src/equal/mod.rs                        |   1 +
 arrow-data/src/transform/mod.rs                    |  16 +
 arrow-integration-test/src/datatype.rs             |   1 +
 arrow-ipc/src/convert.rs                           |   1 +
 arrow-schema/src/datatype.rs                       |  23 +
 arrow-schema/src/error.rs                          |   4 +
 arrow-schema/src/field.rs                          |   1 +
 parquet/src/arrow/arrow_writer/mod.rs              |   2 +-
 parquet/src/arrow/schema/mod.rs                    |   1 +
 16 files changed, 1511 insertions(+), 4 deletions(-)

diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index 1e17e35d0..69f6ba4d8 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -64,6 +64,9 @@ pub use struct_array::*;
 mod union_array;
 pub use union_array::*;
 
+mod run_array;
+pub use run_array::*;
+
 /// Trait for dealing with different types of array at runtime when the type of the
 /// array is not known in advance.
 pub trait Array: std::fmt::Debug + Send + Sync {
@@ -579,6 +582,20 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
             }
             dt => panic!("Unexpected dictionary key type {:?}", dt),
         },
+        DataType::RunEndEncoded(ref run_ends_type, _) => {
+            match run_ends_type.data_type() {
+                DataType::Int16 => {
+                    Arc::new(RunArray::<Int16Type>::from(data)) as ArrayRef
+                }
+                DataType::Int32 => {
+                    Arc::new(RunArray::<Int32Type>::from(data)) as ArrayRef
+                }
+                DataType::Int64 => {
+                    Arc::new(RunArray::<Int64Type>::from(data)) as ArrayRef
+                }
+                dt => panic!("Unexpected data type for run_ends array {:?}", dt),
+            }
+        }
         DataType::Null => Arc::new(NullArray::from(data)) as ArrayRef,
         DataType::Decimal128(_, _) => Arc::new(Decimal128Array::from(data)) as ArrayRef,
         DataType::Decimal256(_, _) => Arc::new(Decimal256Array::from(data)) as ArrayRef,
@@ -737,6 +754,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
             new_null_sized_decimal(data_type, length, std::mem::size_of::<i128>())
         }
         DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32),
+        DataType::RunEndEncoded(_, _) => todo!(),
     }
 }
 
diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs
new file mode 100644
index 000000000..0e39cd288
--- /dev/null
+++ b/arrow-array/src/array/run_array.rs
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+
+use arrow_buffer::ArrowNativeType;
+use arrow_data::{ArrayData, ArrayDataBuilder};
+use arrow_schema::{ArrowError, DataType, Field};
+
+use crate::{
+    builder::StringRunBuilder,
+    make_array,
+    types::{Int16Type, Int32Type, Int64Type, RunEndIndexType},
+    Array, ArrayRef, PrimitiveArray,
+};
+
+///
+/// A run-end encoding (REE) is a variation of [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding).
+///
+/// This encoding is good for representing data containing same values repeated consecutively.
+///
+/// [`RunArray`] contains `run_ends` array and `values` array of same length.
+/// The `run_ends` array stores the indexes at which the run ends. The `values` array
+/// stores the value of each run. Below example illustrates how a logical array is represented in
+/// [`RunArray`]
+///
+///
+/// ```text
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐
+///   ┌─────────────────┐  ┌─────────┐       ┌─────────────────┐
+/// │ │        A        │  │    2    │ │     │        A        │     
+///   ├─────────────────┤  ├─────────┤       ├─────────────────┤
+/// │ │        D        │  │    3    │ │     │        A        │    run length of 'A' = runs_ends[0] - 0 = 2
+///   ├─────────────────┤  ├─────────┤       ├─────────────────┤
+/// │ │        B        │  │    6    │ │     │        D        │    run length of 'D' = run_ends[1] - run_ends[0] = 1
+///   └─────────────────┘  └─────────┘       ├─────────────────┤
+/// │        values          run_ends  │     │        B        │     
+///                                          ├─────────────────┤
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘     │        B        │     
+///                                          ├─────────────────┤
+///                RunArray                  │        B        │    run length of 'B' = run_ends[2] - run_ends[1] = 3
+///               length = 3                 └─────────────────┘
+///  
+///                                             Logical array
+///                                                Contents
+/// ```
+
+pub struct RunArray<R: RunEndIndexType> {
+    data: ArrayData,
+    run_ends: PrimitiveArray<R>,
+    values: ArrayRef,
+}
+
+impl<R: RunEndIndexType> RunArray<R> {
+    // calculates the logical length of the array encoded
+    // by the given run_ends array.
+    fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
+        let len = run_ends.len();
+        if len == 0 {
+            return 0;
+        }
+        run_ends.value(len - 1).as_usize()
+    }
+
+    /// Attempts to create RunArray using given run_ends (index where a run ends)
+    /// and the values (value of the run). Returns an error if the given data is not compatible
+    /// with RunEndEncoded specification.
+    pub fn try_new(
+        run_ends: &PrimitiveArray<R>,
+        values: &dyn Array,
+    ) -> Result<Self, ArrowError> {
+        let run_ends_type = run_ends.data_type().clone();
+        let values_type = values.data_type().clone();
+        let ree_array_type = DataType::RunEndEncoded(
+            Box::new(Field::new("run_ends", run_ends_type, false)),
+            Box::new(Field::new("values", values_type, true)),
+        );
+        let len = RunArray::logical_len(run_ends);
+        let builder = ArrayDataBuilder::new(ree_array_type)
+            .len(len)
+            .add_child_data(run_ends.data().clone())
+            .add_child_data(values.data().clone());
+
+        // `build_unchecked` is used to avoid recursive validation of child arrays.
+        let array_data = unsafe { builder.build_unchecked() };
+
+        // Safety: `validate_data` checks below
+        //    1. The given array data has exactly two child arrays.
+        //    2. The first child array (run_ends) has valid data type.
+        //    3. run_ends array does not have null values
+        //    4. run_ends array has non-zero and strictly increasing values.
+        //    5. The length of run_ends array and values array are the same.
+        array_data.validate_data()?;
+
+        Ok(array_data.into())
+    }
+
+    /// Returns a reference to run_ends array
+    ///
+    /// Note: any slicing of this array is not applied to the returned array
+    /// and must be handled separately
+    pub fn run_ends(&self) -> &PrimitiveArray<R> {
+        &self.run_ends
+    }
+
+    /// Returns a reference to values array
+    pub fn values(&self) -> &ArrayRef {
+        &self.values
+    }
+}
+
+impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
+    // The method assumes the caller already validated the data using `ArrayData::validate_data()`
+    fn from(data: ArrayData) -> Self {
+        match data.data_type() {
+            DataType::RunEndEncoded(_, _) => {}
+            _ => {
+                panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded");
+            }
+        }
+
+        let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
+        let values = make_array(data.child_data()[1].clone());
+        Self {
+            data,
+            run_ends,
+            values,
+        }
+    }
+}
+
+impl<R: RunEndIndexType> From<RunArray<R>> for ArrayData {
+    fn from(array: RunArray<R>) -> Self {
+        array.data
+    }
+}
+
+impl<T: RunEndIndexType> Array for RunArray<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data(&self) -> &ArrayData {
+        &self.data
+    }
+
+    fn into_data(self) -> ArrayData {
+        self.into()
+    }
+}
+
+impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        writeln!(
+            f,
+            "RunArray {{run_ends: {:?}, values: {:?}}}",
+            self.run_ends, self.values
+        )
+    }
+}
+
+/// Constructs a `RunArray` from an iterator of optional strings.
+///
+/// # Example:
+/// ```
+/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
+///
+/// let test = vec!["a", "a", "b", "c", "c"];
+/// let array: RunArray<Int16Type> = test
+///     .iter()
+///     .map(|&x| if x == "b" { None } else { Some(x) })
+///     .collect();
+/// assert_eq!(
+///     "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  5,\n], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
+///     format!("{:?}", array)
+/// );
+/// ```
+impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
+    fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
+        let it = iter.into_iter();
+        let (lower, _) = it.size_hint();
+        let mut builder = StringRunBuilder::with_capacity(lower, 256);
+        it.for_each(|i| {
+            builder.append_option(i);
+        });
+
+        builder.finish()
+    }
+}
+
+/// Constructs a `RunArray` from an iterator of strings.
+///
+/// # Example:
+///
+/// ```
+/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
+///
+/// let test = vec!["a", "a", "b", "c"];
+/// let array: RunArray<Int16Type> = test.into_iter().collect();
+/// assert_eq!(
+///     "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  4,\n], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
+///     format!("{:?}", array)
+/// );
+/// ```
+impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
+    fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
+        let it = iter.into_iter();
+        let (lower, _) = it.size_hint();
+        let mut builder = StringRunBuilder::with_capacity(lower, 256);
+        it.for_each(|i| {
+            builder.append_value(i);
+        });
+
+        builder.finish()
+    }
+}
+
+///
+/// A [`RunArray`] array where run ends are stored using `i16` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int16RunArray, Int16Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int16RunArray = RunArray<Int16Type>;
+
+///
+/// A [`RunArray`] array where run ends are stored using `i32` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int32RunArray, Int32Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int32RunArray = RunArray<Int32Type>;
+
+///
+/// A [`RunArray`] array where run ends are stored using `i64` data type.
+///
+/// # Example: Using `collect`
+/// ```
+/// # use arrow_array::{Array, Int64RunArray, Int64Array, StringArray};
+/// # use std::sync::Arc;
+///
+/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.values(), &values);
+/// ```
+pub type Int64RunArray = RunArray<Int64Type>;
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::builder::PrimitiveRunBuilder;
+    use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
+    use crate::{Array, Int16Array, Int32Array, StringArray};
+
+    #[test]
+    fn test_run_array() {
+        // Construct a value array
+        let value_data = PrimitiveArray::<Int8Type>::from_iter_values([
+            10_i8, 11, 12, 13, 14, 15, 16, 17,
+        ]);
+
+        // Construct a run_ends array:
+        let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
+            4_i16, 6, 7, 9, 13, 18, 20, 22,
+        ]);
+
+        // Construct a run ends encoded array from the above two
+        let ree_array =
+            RunArray::<Int16Type>::try_new(&run_ends_data, &value_data).unwrap();
+
+        assert_eq!(ree_array.len(), 22);
+        assert_eq!(ree_array.null_count(), 0);
+
+        let values = ree_array.values();
+        assert_eq!(&value_data.into_data(), values.data());
+        assert_eq!(&DataType::Int8, values.data_type());
+
+        let run_ends = ree_array.run_ends();
+        assert_eq!(&run_ends_data.into_data(), run_ends.data());
+        assert_eq!(&DataType::Int16, run_ends.data_type());
+    }
+
+    #[test]
+    fn test_run_array_fmt_debug() {
+        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(3);
+        builder.append_value(12345678);
+        builder.append_null();
+        builder.append_value(22345678);
+        let array = builder.finish();
+        assert_eq!(
+            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  1,\n  2,\n  3,\n], values: PrimitiveArray<UInt32>\n[\n  12345678,\n  null,\n  22345678,\n]}\n",
+            format!("{:?}", array)
+        );
+
+        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(20);
+        for _ in 0..20 {
+            builder.append_value(1);
+        }
+        let array = builder.finish();
+
+        assert_eq!(array.len(), 20);
+        assert_eq!(array.null_count(), 0);
+
+        assert_eq!(
+            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  20,\n], values: PrimitiveArray<UInt32>\n[\n  1,\n]}\n",
+            format!("{:?}", array)
+        );
+    }
+
+    #[test]
+    fn test_run_array_from_iter() {
+        let test = vec!["a", "a", "b", "c"];
+        let array: RunArray<Int16Type> = test
+            .iter()
+            .map(|&x| if x == "b" { None } else { Some(x) })
+            .collect();
+        assert_eq!(
+            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  4,\n], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
+            format!("{:?}", array)
+        );
+
+        assert_eq!(array.len(), 4);
+        assert_eq!(array.null_count(), 0);
+
+        let array: RunArray<Int16Type> = test.into_iter().collect();
+        assert_eq!(
+            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  4,\n], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
+            format!("{:?}", array)
+        );
+    }
+
+    #[test]
+    fn test_run_array_run_ends_as_primitive_array() {
+        let test = vec!["a", "b", "c", "a"];
+        let array: RunArray<Int16Type> = test.into_iter().collect();
+
+        assert_eq!(array.len(), 4);
+        assert_eq!(array.null_count(), 0);
+
+        let run_ends = array.run_ends();
+        assert_eq!(&DataType::Int16, run_ends.data_type());
+        assert_eq!(0, run_ends.null_count());
+        assert_eq!(&[1, 2, 3, 4], run_ends.values());
+    }
+
+    #[test]
+    fn test_run_array_as_primitive_array_with_null() {
+        let test = vec![Some("a"), None, Some("b"), None, None, Some("a")];
+        let array: RunArray<Int32Type> = test.into_iter().collect();
+
+        assert_eq!(array.len(), 6);
+        assert_eq!(array.null_count(), 0);
+
+        let run_ends = array.run_ends();
+        assert_eq!(&DataType::Int32, run_ends.data_type());
+        assert_eq!(0, run_ends.null_count());
+        assert_eq!(5, run_ends.len());
+        assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
+
+        let values_data = array.values();
+        assert_eq!(2, values_data.null_count());
+        assert_eq!(5, values_data.len());
+    }
+
+    #[test]
+    fn test_run_array_all_nulls() {
+        let test = vec![None, None, None];
+        let array: RunArray<Int32Type> = test.into_iter().collect();
+
+        assert_eq!(array.len(), 3);
+        assert_eq!(array.null_count(), 0);
+
+        let run_ends = array.run_ends();
+        assert_eq!(1, run_ends.len());
+        assert_eq!(&[3], run_ends.values());
+
+        let values_data = array.values();
+        assert_eq!(1, values_data.null_count());
+    }
+
+    #[test]
+    fn test_run_array_try_new() {
+        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
+            .into_iter()
+            .collect();
+        let run_ends: Int32Array =
+            [Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
+
+        let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
+        assert_eq!(array.run_ends().data_type(), &DataType::Int32);
+        assert_eq!(array.values().data_type(), &DataType::Utf8);
+
+        assert_eq!(array.null_count(), 0);
+        assert_eq!(array.len(), 4);
+        assert_eq!(array.run_ends.null_count(), 0);
+        assert_eq!(array.values().null_count(), 1);
+
+        assert_eq!(
+            "RunArray {run_ends: PrimitiveArray<Int32>\n[\n  1,\n  2,\n  3,\n  4,\n], values: StringArray\n[\n  \"foo\",\n  \"bar\",\n  null,\n  \"baz\",\n]}\n",
+            format!("{:?}", array)
+        );
+    }
+
+    #[test]
+    fn test_run_array_int16_type_definition() {
+        let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
+        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+        assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+        assert_eq!(array.values(), &values);
+    }
+
+    #[test]
+    fn test_run_array_empty_string() {
+        let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
+        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
+        assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
+        assert_eq!(array.values(), &values);
+    }
+
+    #[test]
+    fn test_run_array_length_mismatch() {
+        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
+            .into_iter()
+            .collect();
+        let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect();
+
+        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+        let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string());
+        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+    }
+
+    #[test]
+    fn test_run_array_run_ends_with_null() {
+        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+            .into_iter()
+            .collect();
+        let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect();
+
+        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+        let expected = ArrowError::InvalidArgumentError("Found null values in run_ends array. The run_ends array should not have null values.".to_string());
+        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+    }
+
+    #[test]
+    fn test_run_array_run_ends_with_zeroes() {
+        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+            .into_iter()
+            .collect();
+        let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect();
+
+        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string());
+        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+    }
+
+    #[test]
+    fn test_run_array_run_ends_non_increasing() {
+        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
+            .into_iter()
+            .collect();
+        let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect();
+
+        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
+        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string());
+        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "PrimitiveArray expected ArrayData with type Int64 got Int32"
+    )]
+    fn test_run_array_run_ends_data_type_mismatch() {
+        let a = RunArray::<Int32Type>::from_iter(["32"]);
+        let _ = RunArray::<Int64Type>::from(a.into_data());
+    }
+}
diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs
new file mode 100644
index 000000000..c1ecbcb5d
--- /dev/null
+++ b/arrow-array/src/builder/generic_byte_run_builder.rs
@@ -0,0 +1,519 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::types::bytes::ByteArrayNativeType;
+use std::{any::Any, sync::Arc};
+
+use crate::{
+    types::{
+        BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType,
+        Utf8Type,
+    },
+    ArrayRef, ArrowPrimitiveType, RunArray,
+};
+
+use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
+
+use arrow_buffer::ArrowNativeType;
+
+/// Array builder for [`RunArray`] for String and Binary types.
+///
+/// # Example:
+///
+/// ```
+///
+/// # use arrow_array::builder::GenericByteRunBuilder;
+/// # use arrow_array::{GenericByteArray, BinaryArray};
+/// # use arrow_array::types::{BinaryType, Int16Type};
+/// # use arrow_array::{Array, Int16Array};
+/// # use arrow_array::cast::as_generic_binary_array;
+///
+/// let mut builder =
+/// GenericByteRunBuilder::<Int16Type, BinaryType>::new();
+/// builder.append_value(b"abc");
+/// builder.append_value(b"abc");
+/// builder.append_null();
+/// builder.append_value(b"def");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+///     array.run_ends(),
+///     &Int16Array::from(vec![Some(2), Some(3), Some(4)])
+/// );
+///
+/// let av = array.values();
+///
+/// assert!(!av.is_null(0));
+/// assert!(av.is_null(1));
+/// assert!(!av.is_null(2));
+///
+/// // Values are polymorphic and so require a downcast.
+/// let ava: &BinaryArray = as_generic_binary_array(av.as_ref());
+///
+/// assert_eq!(ava.value(0), b"abc");
+/// assert_eq!(ava.value(2), b"def");
+/// ```
+#[derive(Debug)]
+pub struct GenericByteRunBuilder<R, V>
+where
+    R: ArrowPrimitiveType,
+    V: ByteArrayType,
+{
+    run_ends_builder: PrimitiveBuilder<R>,
+    values_builder: GenericByteBuilder<V>,
+    current_value: Vec<u8>,
+    has_current_value: bool,
+    current_run_end_index: usize,
+    prev_run_end_index: usize,
+}
+
+impl<R, V> Default for GenericByteRunBuilder<R, V>
+where
+    R: ArrowPrimitiveType,
+    V: ByteArrayType,
+{
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<R, V> GenericByteRunBuilder<R, V>
+where
+    R: ArrowPrimitiveType,
+    V: ByteArrayType,
+{
+    /// Creates a new `GenericByteRunBuilder`
+    pub fn new() -> Self {
+        Self {
+            run_ends_builder: PrimitiveBuilder::new(),
+            values_builder: GenericByteBuilder::<V>::new(),
+            current_value: Vec::new(),
+            has_current_value: false,
+            current_run_end_index: 0,
+            prev_run_end_index: 0,
+        }
+    }
+
+    /// Creates a new `GenericByteRunBuilder` with the provided capacity
+    ///
+    /// `capacity`: the expected number of run-end encoded values.
+    /// `data_capacity`: the expected number of bytes of run end encoded values
+    pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self {
+        Self {
+            run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
+            values_builder: GenericByteBuilder::<V>::with_capacity(
+                capacity,
+                data_capacity,
+            ),
+            current_value: Vec::new(),
+            has_current_value: false,
+            current_run_end_index: 0,
+            prev_run_end_index: 0,
+        }
+    }
+}
+
+impl<R, V> ArrayBuilder for GenericByteRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ByteArrayType,
+{
+    /// Returns the builder as a non-mutable `Any` reference.
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns the builder as a mutable `Any` reference.
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    /// Returns the boxed builder as a box of `Any`.
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+
+    /// Returns the length of logical array encoded by
+    /// the eventual runs array.
+    fn len(&self) -> usize {
+        self.current_run_end_index
+    }
+
+    /// Returns whether the number of array slots is zero
+    fn is_empty(&self) -> bool {
+        self.current_run_end_index == 0
+    }
+
+    /// Builds the array and reset this builder.
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    /// Builds the array without resetting the builder.
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+}
+
+impl<R, V> GenericByteRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ByteArrayType,
+{
+    /// Appends optional value to the logical array encoded by the RunArray.
+    pub fn append_option(&mut self, input_value: Option<impl AsRef<V::Native>>) {
+        match input_value {
+            Some(value) => self.append_value(value),
+            None => self.append_null(),
+        }
+    }
+
+    /// Appends value to the logical array encoded by the RunArray.
+    pub fn append_value(&mut self, input_value: impl AsRef<V::Native>) {
+        let value: &[u8] = input_value.as_ref().as_ref();
+        if !self.has_current_value {
+            self.append_run_end();
+            self.current_value.extend_from_slice(value);
+            self.has_current_value = true;
+        } else if self.current_value.as_slice() != value {
+            self.append_run_end();
+            self.current_value.clear();
+            self.current_value.extend_from_slice(value);
+        }
+        self.current_run_end_index += 1;
+    }
+
+    /// Appends null to the logical array encoded by the RunArray.
+    pub fn append_null(&mut self) {
+        if self.has_current_value {
+            self.append_run_end();
+            self.current_value.clear();
+            self.has_current_value = false;
+        }
+        self.current_run_end_index += 1;
+    }
+
+    /// Creates the RunArray and resets the builder.
+    /// Panics if RunArray cannot be built.
+    pub fn finish(&mut self) -> RunArray<R> {
+        // write the last run end to the array.
+        self.append_run_end();
+
+        // reset the run end index to zero.
+        self.current_value.clear();
+        self.has_current_value = false;
+        self.current_run_end_index = 0;
+        self.prev_run_end_index = 0;
+
+        // build the run encoded array by adding run_ends and values array as its children.
+        let run_ends_array = self.run_ends_builder.finish();
+        let values_array = self.values_builder.finish();
+        RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+    }
+
+    /// Creates the RunArray and without resetting the builder.
+    /// Panics if RunArray cannot be built.
+    pub fn finish_cloned(&self) -> RunArray<R> {
+        let mut run_ends_array = self.run_ends_builder.finish_cloned();
+        let mut values_array = self.values_builder.finish_cloned();
+
+        // Add current run if one exists
+        if self.prev_run_end_index != self.current_run_end_index {
+            let mut run_end_builder = run_ends_array.into_builder().unwrap();
+            let mut values_builder = values_array.into_builder().unwrap();
+            self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
+            run_ends_array = run_end_builder.finish();
+            values_array = values_builder.finish();
+        }
+
+        RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+    }
+
+    // Appends the current run to the array.
+    fn append_run_end(&mut self) {
+        // empty array or the function called without appending any value.
+        if self.prev_run_end_index == self.current_run_end_index {
+            return;
+        }
+        let run_end_index = self.run_end_index_as_native();
+        self.run_ends_builder.append_value(run_end_index);
+        if self.has_current_value {
+            let slice = self.current_value.as_slice();
+            let native = unsafe {
+                // Safety:
+                // As self.current_value is created from V::Native. The value V::Native can be
+                // built back from the bytes without validations
+                V::Native::from_bytes_unchecked(slice)
+            };
+            self.values_builder.append_value(native);
+        } else {
+            self.values_builder.append_null();
+        }
+        self.prev_run_end_index = self.current_run_end_index;
+    }
+
+    // Similar to `append_run_end` but on custom builders.
+    // Used in `finish_cloned` which is not suppose to mutate `self`.
+    fn append_run_end_with_builders(
+        &self,
+        run_ends_builder: &mut PrimitiveBuilder<R>,
+        values_builder: &mut GenericByteBuilder<V>,
+    ) {
+        let run_end_index = self.run_end_index_as_native();
+        run_ends_builder.append_value(run_end_index);
+        if self.has_current_value {
+            let slice = self.current_value.as_slice();
+            let native = unsafe {
+                // Safety:
+                // As self.current_value is created from V::Native. The value V::Native can be
+                // built back from the bytes without validations
+                V::Native::from_bytes_unchecked(slice)
+            };
+            values_builder.append_value(native);
+        } else {
+            values_builder.append_null();
+        }
+    }
+
+    fn run_end_index_as_native(&self) -> R::Native {
+        R::Native::from_usize(self.current_run_end_index)
+        .unwrap_or_else(|| panic!(
+                "Cannot convert the value {} from `usize` to native form of arrow datatype {}",
+                self.current_run_end_index,
+                R::DATA_TYPE
+        ))
+    }
+}
+
+/// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]).
+///
+/// ```
+/// // Create a run-end encoded array with run-end indexes data type as `i16`.
+/// // The encoded values are Strings.
+///
+/// # use arrow_array::builder::StringRunBuilder;
+/// # use arrow_array::{Int16Array, StringArray};
+/// # use arrow_array::types::Int16Type;
+/// # use arrow_array::cast::as_string_array;
+///
+/// let mut builder = StringRunBuilder::<Int16Type>::new();
+///
+/// // The builder builds the dictionary value by value
+/// builder.append_value("abc");
+/// builder.append_null();
+/// builder.append_value("def");
+/// builder.append_value("def");
+/// builder.append_value("abc");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+///   array.run_ends(),
+///   &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+/// );
+///
+/// // Values are polymorphic and so require a downcast.
+/// let av = array.values();
+/// let ava: &StringArray = as_string_array(av.as_ref());
+///
+/// assert_eq!(ava.value(0), "abc");
+/// assert!(av.is_null(1));
+/// assert_eq!(ava.value(2), "def");
+/// assert_eq!(ava.value(3), "abc");
+///
+/// ```
+pub type StringRunBuilder<K> = GenericByteRunBuilder<K, Utf8Type>;
+
+/// Array builder for [`RunArray`] that encodes large strings ([`LargeUtf8Type`]). See [`StringRunBuilder`] for an example.
+pub type LargeStringRunBuilder<K> = GenericByteRunBuilder<K, LargeUtf8Type>;
+
+/// Array builder for [`RunArray`] that encodes binary values([`BinaryType`]).
+///
+/// ```
+/// // Create a run-end encoded array with run-end indexes data type as `i16`.
+/// // The encoded data is binary values.
+///
+/// # use arrow_array::builder::BinaryRunBuilder;
+/// # use arrow_array::{BinaryArray, Int16Array};
+/// # use arrow_array::types::Int16Type;
+/// # use arrow_array::cast::as_generic_binary_array;
+///
+/// let mut builder = BinaryRunBuilder::<Int16Type>::new();
+///
+/// // The builder builds the dictionary value by value
+/// builder.append_value(b"abc");
+/// builder.append_null();
+/// builder.append_value(b"def");
+/// builder.append_value(b"def");
+/// builder.append_value(b"abc");
+/// let array = builder.finish();
+///
+/// assert_eq!(
+///   array.run_ends(),
+///   &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+/// );
+///
+/// // Values are polymorphic and so require a downcast.
+/// let av = array.values();
+/// let ava: &BinaryArray = as_generic_binary_array::<i32>(av.as_ref());
+///
+/// assert_eq!(ava.value(0), b"abc");
+/// assert!(av.is_null(1));
+/// assert_eq!(ava.value(2), b"def");
+/// assert_eq!(ava.value(3), b"abc");
+///
+/// ```
+pub type BinaryRunBuilder<K> = GenericByteRunBuilder<K, BinaryType>;
+
+/// Array builder for [`RunArray`] that encodes large binary values([`LargeBinaryType`]).
+/// See documentation of [`BinaryRunBuilder`] for an example.
+pub type LargeBinaryRunBuilder<K> = GenericByteRunBuilder<K, LargeBinaryType>;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::array::Array;
+    use crate::types::Int16Type;
+    use crate::GenericByteArray;
+    use crate::Int16Array;
+    use crate::Int16RunArray;
+
+    fn test_bytes_run_buider<T>(values: Vec<&T::Native>)
+    where
+        T: ByteArrayType,
+        <T as ByteArrayType>::Native: PartialEq,
+        <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
+    {
+        let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
+        builder.append_value(values[0]);
+        builder.append_value(values[0]);
+        builder.append_value(values[0]);
+        builder.append_null();
+        builder.append_null();
+        builder.append_value(values[1]);
+        builder.append_value(values[1]);
+        builder.append_value(values[2]);
+        builder.append_value(values[2]);
+        builder.append_value(values[2]);
+        builder.append_value(values[2]);
+        let array = builder.finish();
+
+        assert_eq!(array.len(), 11);
+        assert_eq!(array.null_count(), 0);
+
+        assert_eq!(
+            array.run_ends(),
+            &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)])
+        );
+
+        // Values are polymorphic and so require a downcast.
+        let av = array.values();
+        let ava: &GenericByteArray<T> =
+            av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+        assert_eq!(*ava.value(0), *values[0]);
+        assert!(ava.is_null(1));
+        assert_eq!(*ava.value(2), *values[1]);
+        assert_eq!(*ava.value(3), *values[2]);
+    }
+
+    #[test]
+    fn test_string_run_buider() {
+        test_bytes_run_buider::<Utf8Type>(vec!["abc", "def", "ghi"]);
+    }
+
+    #[test]
+    fn test_string_run_buider_with_empty_strings() {
+        test_bytes_run_buider::<Utf8Type>(vec!["abc", "", "ghi"]);
+    }
+
+    #[test]
+    fn test_binary_run_buider() {
+        test_bytes_run_buider::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
+    }
+
+    fn test_bytes_run_buider_finish_cloned<T>(values: Vec<&T::Native>)
+    where
+        T: ByteArrayType,
+        <T as ByteArrayType>::Native: PartialEq,
+        <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
+    {
+        let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
+
+        builder.append_value(values[0]);
+        builder.append_null();
+        builder.append_value(values[1]);
+        builder.append_value(values[1]);
+        builder.append_value(values[0]);
+        let mut array: Int16RunArray = builder.finish_cloned();
+
+        assert_eq!(array.len(), 5);
+        assert_eq!(array.null_count(), 0);
+
+        assert_eq!(
+            array.run_ends(),
+            &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
+        );
+
+        // Values are polymorphic and so require a downcast.
+        let av = array.values();
+        let ava: &GenericByteArray<T> =
+            av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+        assert_eq!(ava.value(0), values[0]);
+        assert!(ava.is_null(1));
+        assert_eq!(ava.value(2), values[1]);
+        assert_eq!(ava.value(3), values[0]);
+
+        // Append last value before `finish_cloned` (`value[0]`) again and ensure it has only
+        // one entry in final output.
+        builder.append_value(values[0]);
+        builder.append_value(values[0]);
+        builder.append_value(values[1]);
+        array = builder.finish();
+
+        assert_eq!(array.len(), 8);
+        assert_eq!(array.null_count(), 0);
+
+        assert_eq!(
+            array.run_ends(),
+            &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), Some(8),])
+        );
+
+        // Values are polymorphic and so require a downcast.
+        let av2 = array.values();
+        let ava2: &GenericByteArray<T> =
+            av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
+
+        assert_eq!(ava2.value(0), values[0]);
+        assert!(ava2.is_null(1));
+        assert_eq!(ava2.value(2), values[1]);
+        // The value appended before and after `finish_cloned` has only one entry.
+        assert_eq!(ava2.value(3), values[0]);
+        assert_eq!(ava2.value(4), values[1]);
+    }
+
+    #[test]
+    fn test_string_run_buider_finish_cloned() {
+        test_bytes_run_buider_finish_cloned::<Utf8Type>(vec!["abc", "def", "ghi"]);
+    }
+
+    #[test]
+    fn test_binary_run_buider_finish_cloned() {
+        test_bytes_run_buider_finish_cloned::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
+    }
+}
diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs
index 820ecd23b..fc2454635 100644
--- a/arrow-array/src/builder/mod.rs
+++ b/arrow-array/src/builder/mod.rs
@@ -39,10 +39,14 @@ mod primitive_builder;
 pub use primitive_builder::*;
 mod primitive_dictionary_builder;
 pub use primitive_dictionary_builder::*;
+mod primitive_run_builder;
+pub use primitive_run_builder::*;
 mod struct_builder;
 pub use struct_builder::*;
 mod generic_bytes_dictionary_builder;
 pub use generic_bytes_dictionary_builder::*;
+mod generic_byte_run_builder;
+pub use generic_byte_run_builder::*;
 mod union_builder;
 pub use union_builder::*;
 
diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs
new file mode 100644
index 000000000..82c46abfa
--- /dev/null
+++ b/arrow-array/src/builder/primitive_run_builder.rs
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use crate::{types::RunEndIndexType, ArrayRef, ArrowPrimitiveType, RunArray};
+
+use super::{ArrayBuilder, PrimitiveBuilder};
+
+use arrow_buffer::ArrowNativeType;
+
+/// Array builder for [`RunArray`] that encodes primitive values.
+///
+/// # Example:
+///
+/// ```
+///
+/// # use arrow_array::builder::PrimitiveRunBuilder;
+/// # use arrow_array::cast::as_primitive_array;
+/// # use arrow_array::types::{UInt32Type, Int16Type};
+/// # use arrow_array::{Array, UInt32Array, Int16Array};
+///
+/// let mut builder =
+/// PrimitiveRunBuilder::<Int16Type, UInt32Type>::new();
+/// builder.append_value(1234);
+/// builder.append_value(1234);
+/// builder.append_value(1234);
+/// builder.append_null();
+/// builder.append_value(5678);
+/// builder.append_value(5678);
+/// let array = builder.finish();
+///
+/// assert_eq!(
+///     array.run_ends(),
+///     &Int16Array::from(vec![Some(3), Some(4), Some(6)])
+/// );
+///
+/// let av = array.values();
+///
+/// assert!(!av.is_null(0));
+/// assert!(av.is_null(1));
+/// assert!(!av.is_null(2));
+///
+/// // Values are polymorphic and so require a downcast.
+/// let ava: &UInt32Array = as_primitive_array::<UInt32Type>(av.as_ref());
+///
+/// assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)]));
+/// ```
+#[derive(Debug)]
+pub struct PrimitiveRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ArrowPrimitiveType,
+{
+    run_ends_builder: PrimitiveBuilder<R>,
+    values_builder: PrimitiveBuilder<V>,
+    current_value: Option<V::Native>,
+    current_run_end_index: usize,
+    prev_run_end_index: usize,
+}
+
+impl<R, V> Default for PrimitiveRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ArrowPrimitiveType,
+{
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<R, V> PrimitiveRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ArrowPrimitiveType,
+{
+    /// Creates a new `PrimitiveRunBuilder`
+    pub fn new() -> Self {
+        Self {
+            run_ends_builder: PrimitiveBuilder::new(),
+            values_builder: PrimitiveBuilder::new(),
+            current_value: None,
+            current_run_end_index: 0,
+            prev_run_end_index: 0,
+        }
+    }
+
+    /// Creates a new `PrimitiveRunBuilder` with the provided capacity
+    ///
+    /// `capacity`: the expected number of run-end encoded values.
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
+            values_builder: PrimitiveBuilder::with_capacity(capacity),
+            current_value: None,
+            current_run_end_index: 0,
+            prev_run_end_index: 0,
+        }
+    }
+}
+
+impl<R, V> ArrayBuilder for PrimitiveRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ArrowPrimitiveType,
+{
+    /// Returns the builder as a non-mutable `Any` reference.
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns the builder as a mutable `Any` reference.
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    /// Returns the boxed builder as a box of `Any`.
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+
+    /// Returns the length of logical array encoded by
+    /// the eventual runs array.
+    fn len(&self) -> usize {
+        self.current_run_end_index
+    }
+
+    /// Returns whether the number of array slots is zero
+    fn is_empty(&self) -> bool {
+        self.current_run_end_index == 0
+    }
+
+    /// Builds the array and reset this builder.
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    /// Builds the array without resetting the builder.
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+}
+
+impl<R, V> PrimitiveRunBuilder<R, V>
+where
+    R: RunEndIndexType,
+    V: ArrowPrimitiveType,
+{
+    /// Appends optional value to the logical array encoded by the RunArray.
+    pub fn append_option(&mut self, value: Option<V::Native>) {
+        if self.current_run_end_index == 0 {
+            self.current_run_end_index = 1;
+            self.current_value = value;
+            return;
+        }
+        if self.current_value != value {
+            self.append_run_end();
+            self.current_value = value;
+        }
+
+        self.current_run_end_index += 1;
+    }
+
+    /// Appends value to the logical array encoded by the run-ends array.
+    pub fn append_value(&mut self, value: V::Native) {
+        self.append_option(Some(value))
+    }
+
+    /// Appends null to the logical array encoded by the run-ends array.
+    pub fn append_null(&mut self) {
+        self.append_option(None)
+    }
+
+    /// Creates the RunArray and resets the builder.
+    /// Panics if RunArray cannot be built.
+    pub fn finish(&mut self) -> RunArray<R> {
+        // write the last run end to the array.
+        self.append_run_end();
+
+        // reset the run index to zero.
+        self.current_value = None;
+        self.current_run_end_index = 0;
+
+        // build the run encoded array by adding run_ends and values array as its children.
+        let run_ends_array = self.run_ends_builder.finish();
+        let values_array = self.values_builder.finish();
+        RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
+    }
+
+    /// Creates the RunArray and without resetting the builder.
+    /// Panics if RunArray cannot be built.
+    pub fn finish_cloned(&self) -> RunArray<R> {
+        let mut run_ends_array = self.run_ends_builder.finish_cloned();
+        let mut values_array = self.values_builder.finish_cloned();
+
+        // Add current run if one exists
+        if self.prev_run_end_index != self.current_run_end_index {
+            let mut run_end_builder = run_ends_array.into_builder().unwrap();
+            let mut values_builder = values_array.into_builder().unwrap();
+            self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
+            run_ends_array = run_end_builder.finish();
+            values_array = values_builder.finish();
+        }
+
+        RunArray::try_new(&run_ends_array, &values_array).unwrap()
+    }
+
+    // Appends the current run to the array.
+    fn append_run_end(&mut self) {
+        // empty array or the function called without appending any value.
+        if self.prev_run_end_index == self.current_run_end_index {
+            return;
+        }
+        let run_end_index = self.run_end_index_as_native();
+        self.run_ends_builder.append_value(run_end_index);
+        self.values_builder.append_option(self.current_value);
+        self.prev_run_end_index = self.current_run_end_index;
+    }
+
+    // Similar to `append_run_end` but on custom builders.
+    // Used in `finish_cloned` which is not suppose to mutate `self`.
+    fn append_run_end_with_builders(
+        &self,
+        run_ends_builder: &mut PrimitiveBuilder<R>,
+        values_builder: &mut PrimitiveBuilder<V>,
+    ) {
+        let run_end_index = self.run_end_index_as_native();
+        run_ends_builder.append_value(run_end_index);
+        values_builder.append_option(self.current_value);
+    }
+
+    fn run_end_index_as_native(&self) -> R::Native {
+        R::Native::from_usize(self.current_run_end_index)
+        .unwrap_or_else(|| panic!(
+                "Cannot convert `current_run_end_index` {} from `usize` to native form of arrow datatype {}",
+                self.current_run_end_index,
+                R::DATA_TYPE
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::builder::PrimitiveRunBuilder;
+    use crate::cast::as_primitive_array;
+    use crate::types::{Int16Type, UInt32Type};
+    use crate::{Array, Int16Array, UInt32Array};
+
+    #[test]
+    fn test_primitive_ree_array_builder() {
+        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::new();
+        builder.append_value(1234);
+        builder.append_value(1234);
+        builder.append_value(1234);
+        builder.append_null();
+        builder.append_value(5678);
+        builder.append_value(5678);
+
+        let array = builder.finish();
+
+        assert_eq!(array.null_count(), 0);
+        assert_eq!(array.len(), 6);
+
+        assert_eq!(
+            array.run_ends(),
+            &Int16Array::from(vec![Some(3), Some(4), Some(6)])
+        );
+
+        let av = array.values();
+
+        assert!(!av.is_null(0));
+        assert!(av.is_null(1));
+        assert!(!av.is_null(2));
+
+        // Values are polymorphic and so require a downcast.
+        let ava: &UInt32Array = as_primitive_array::<UInt32Type>(av.as_ref());
+
+        assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)]));
+    }
+}
diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs
index 7c41a469e..fc02c0e5a 100644
--- a/arrow-array/src/types.rs
+++ b/arrow-array/src/types.rs
@@ -240,6 +240,31 @@ impl ArrowDictionaryKeyType for UInt32Type {}
 
 impl ArrowDictionaryKeyType for UInt64Type {}
 
+mod run {
+    use super::*;
+
+    pub trait RunEndTypeSealed {}
+
+    impl RunEndTypeSealed for Int16Type {}
+
+    impl RunEndTypeSealed for Int32Type {}
+
+    impl RunEndTypeSealed for Int64Type {}
+}
+
+/// A subtype of primitive type that is used as run-ends index
+/// in `RunArray`.
+/// See <https://arrow.apache.org/docs/format/Columnar.html>
+///
+/// Note: The implementation of this trait is sealed to avoid accidental misuse.
+pub trait RunEndIndexType: ArrowPrimitiveType + run::RunEndTypeSealed {}
+
+impl RunEndIndexType for Int16Type {}
+
+impl RunEndIndexType for Int32Type {}
+
+impl RunEndIndexType for Int64Type {}
+
 /// A subtype of primitive type that represents temporal values.
 pub trait ArrowTemporalType: ArrowPrimitiveType {}
 
diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs
index 258ee082d..07bbc6642 100644
--- a/arrow-data/src/data.rs
+++ b/arrow-data/src/data.rs
@@ -198,9 +198,9 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff
             ],
             _ => unreachable!(),
         },
-        DataType::FixedSizeList(_, _) | DataType::Struct(_) => {
-            [empty_buffer, MutableBuffer::new(0)]
-        }
+        DataType::FixedSizeList(_, _)
+        | DataType::Struct(_)
+        | DataType::RunEndEncoded(_, _) => [empty_buffer, MutableBuffer::new(0)],
         DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => [
             MutableBuffer::new(capacity * mem::size_of::<u8>()),
             empty_buffer,
@@ -724,6 +724,12 @@ impl ArrayData {
             DataType::Dictionary(_, data_type) => {
                 vec![Self::new_empty(data_type)]
             }
+            DataType::RunEndEncoded(run_ends, values) => {
+                vec![
+                    Self::new_empty(run_ends.data_type()),
+                    Self::new_empty(values.data_type()),
+                ]
+            }
         };
 
         // Data was constructed correctly above
@@ -853,6 +859,19 @@ impl ArrayData {
                     )));
                 }
             }
+            DataType::RunEndEncoded(run_ends_type, _) => {
+                if run_ends_type.is_nullable() {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "The nullable should be set to false for the field defining run_ends array.".to_string()
+                    ));
+                }
+                if !DataType::is_run_ends_type(run_ends_type.data_type()) {
+                    return Err(ArrowError::InvalidArgumentError(format!(
+                        "RunArray run_ends types must be Int16, Int32 or Int64, but was {}",
+                        run_ends_type.data_type()
+                    )));
+                }
+            }
             _ => {}
         };
 
@@ -998,6 +1017,25 @@ impl ArrayData {
                 }
                 Ok(())
             }
+            DataType::RunEndEncoded(run_ends_field, values_field) => {
+                self.validate_num_child_data(2)?;
+                let run_ends_data =
+                    self.get_valid_child_data(0, run_ends_field.data_type())?;
+                let values_data =
+                    self.get_valid_child_data(1, values_field.data_type())?;
+                if run_ends_data.len != values_data.len {
+                    return Err(ArrowError::InvalidArgumentError(format!(
+                        "The run_ends array length should be the same as values array length. Run_ends array length is {}, values array length is {}",
+                        run_ends_data.len, values_data.len
+                    )));
+                }
+                if run_ends_data.null_count() > 0 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Found null values in run_ends array. The run_ends array should not have null values.".to_string(),
+                    ));
+                }
+                Ok(())
+            }
             DataType::Union(fields, _, mode) => {
                 self.validate_num_child_data(fields.len())?;
 
@@ -1286,6 +1324,15 @@ impl ArrayData {
                     _ => unreachable!(),
                 }
             }
+            DataType::RunEndEncoded(run_ends, _values) => {
+                let run_ends_data = self.child_data()[0].clone();
+                match run_ends.data_type() {
+                    DataType::Int16 => run_ends_data.check_run_ends::<i16>(self.len()),
+                    DataType::Int32 => run_ends_data.check_run_ends::<i32>(self.len()),
+                    DataType::Int64 => run_ends_data.check_run_ends::<i64>(self.len()),
+                    _ => unreachable!(),
+                }
+            }
             _ => {
                 // No extra validation check required for other types
                 Ok(())
@@ -1446,6 +1493,50 @@ impl ArrayData {
         })
     }
 
+    /// Validates that each value in run_ends array is positive and strictly increasing.
+    fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
+    where
+        T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
+    {
+        let values = self.typed_buffer::<T>(0, self.len())?;
+        let mut prev_value: i64 = 0_i64;
+        values.iter().enumerate().try_for_each(|(ix, &inp_value)| {
+            let value: i64 = inp_value.try_into().map_err(|_| {
+                ArrowError::InvalidArgumentError(format!(
+                    "Value at position {} out of bounds: {} (can not convert to i64)",
+                    ix, inp_value
+                ))
+            })?;
+            if value <= 0_i64 {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "The values in run_ends array should be strictly positive. Found value {} at index {} that does not match the criteria.",
+                    value,
+                    ix
+                )));
+            }
+            if ix > 0 && value <= prev_value {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "The values in run_ends array should be strictly increasing. Found value {} at index {} with previous value {} that does not match the criteria.",
+                    value,
+                    ix,
+                    prev_value
+                )));
+            }
+
+            prev_value = value;
+            Ok(())
+        })?;
+
+        if prev_value.as_usize() != array_len {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "The length of array does not match the last value in the run_ends array. The last value of run_ends array is {} and length of array is {}.",
+                prev_value,
+                array_len
+            )));
+        }
+        Ok(())
+    }
+
     /// Returns true if this `ArrayData` is equal to `other`, using pointer comparisons
     /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
     /// return false when the arrays are logically equal
@@ -1542,6 +1633,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
         DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data
         DataType::LargeList(_) => DataTypeLayout::new_fixed_width(size_of::<i64>()),
         DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data,
+        DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data,
         DataType::Union(_, _, mode) => {
             let type_ids = BufferSpec::FixedWidth {
                 byte_width: size_of::<i8>(),
diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs
index 85c595cfe..aff61e3d3 100644
--- a/arrow-data/src/equal/mod.rs
+++ b/arrow-data/src/equal/mod.rs
@@ -137,6 +137,7 @@ fn equal_values(
         },
         DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start, rhs_start, len),
         DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, len),
+        DataType::RunEndEncoded(_, _) => todo!(),
     }
 }
 
diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs
index 6a8c89d25..2a24b1cc2 100644
--- a/arrow-data/src/transform/mod.rs
+++ b/arrow-data/src/transform/mod.rs
@@ -230,6 +230,7 @@ fn build_extend(array: &ArrayData) -> Extend {
             UnionMode::Sparse => union::build_extend_sparse(array),
             UnionMode::Dense => union::build_extend_dense(array),
         },
+        DataType::RunEndEncoded(_, _) => todo!(),
     }
 }
 
@@ -281,6 +282,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
             UnionMode::Sparse => union::extend_nulls_sparse,
             UnionMode::Dense => union::extend_nulls_dense,
         },
+        DataType::RunEndEncoded(_, _) => todo!(),
     })
 }
 
@@ -473,6 +475,20 @@ impl<'a> MutableArrayData<'a> {
                     })
                     .collect::<Vec<_>>(),
             },
+            DataType::RunEndEncoded(_, _) => {
+                let run_ends_child = arrays
+                    .iter()
+                    .map(|array| &array.child_data()[0])
+                    .collect::<Vec<_>>();
+                let value_child = arrays
+                    .iter()
+                    .map(|array| &array.child_data()[1])
+                    .collect::<Vec<_>>();
+                vec![
+                    MutableArrayData::new(run_ends_child, false, array_capacity),
+                    MutableArrayData::new(value_child, use_nulls, array_capacity),
+                ]
+            }
             DataType::FixedSizeList(_, _) => {
                 let childs = arrays
                     .iter()
diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs
index dd0b95b0a..c2e326b4f 100644
--- a/arrow-integration-test/src/datatype.rs
+++ b/arrow-integration-test/src/datatype.rs
@@ -357,6 +357,7 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value {
         DataType::Map(_, keys_sorted) => {
             json!({"name": "map", "keysSorted": keys_sorted})
         }
+        DataType::RunEndEncoded(_, _) => todo!(),
     }
 }
 
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index a60a19b86..305bb943c 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -711,6 +711,7 @@ pub(crate) fn get_fb_field_type<'a>(
                 children: Some(fbb.create_vector(&children[..])),
             }
         }
+        RunEndEncoded(_, _) => todo!(),
         Map(map_field, keys_sorted) => {
             let child = build_field(fbb, map_field);
             let mut field_type = crate::MapBuilder::new(fbb);
diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs
index da1c20ddb..1e5c1321c 100644
--- a/arrow-schema/src/datatype.rs
+++ b/arrow-schema/src/datatype.rs
@@ -242,6 +242,18 @@ pub enum DataType {
     /// child fields may be respectively "entries", "key", and "value", but this is
     /// not enforced.
     Map(Box<Field>, bool),
+    /// A run-end encoding (REE) is a variation of run-length encoding (RLE). These
+    /// encodings are well-suited for representing data containing sequences of the
+    /// same value, called runs. Each run is represented as a value and an integer giving
+    /// the index in the array where the run ends.
+    ///
+    /// A run-end encoded array has no buffers by itself, but has two child arrays. The
+    /// first child array, called the run ends array, holds either 16, 32, or 64-bit
+    /// signed integers. The actual values of each run are held in the second child array.
+    ///
+    /// These child arrays are prescribed the standard names of "run_ends" and "values"
+    /// respectively.
+    RunEndEncoded(Box<Field>, Box<Field>),
 }
 
 /// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds.
@@ -346,6 +358,13 @@ impl DataType {
         )
     }
 
+    /// Returns true if this type is valid for run-ends array in RunArray
+    #[inline]
+    pub fn is_run_ends_type(&self) -> bool {
+        use DataType::*;
+        matches!(self, Int16 | Int32 | Int64)
+    }
+
     /// Returns true if this type is nested (List, FixedSizeList, LargeList, Struct, Union,
     /// or Map), or a dictionary of a nested type
     pub fn is_nested(&self) -> bool {
@@ -438,6 +457,10 @@ impl DataType {
                         + (std::mem::size_of::<Field>() * fields.capacity())
                 }
                 DataType::Dictionary(dt1, dt2) => dt1.size() + dt2.size(),
+                DataType::RunEndEncoded(run_ends, values) => {
+                    run_ends.size() - std::mem::size_of_val(run_ends) + values.size()
+                        - std::mem::size_of_val(values)
+                }
             }
     }
 }
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index ea60572b3..6213af8bc 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -41,6 +41,7 @@ pub enum ArrowError {
     /// Error during import or export to/from the C Data Interface
     CDataInterface(String),
     DictionaryKeyOverflowError,
+    RunEndIndexOverflowError,
 }
 
 impl ArrowError {
@@ -96,6 +97,9 @@ impl Display for ArrowError {
             ArrowError::DictionaryKeyOverflowError => {
                 write!(f, "Dictionary key bigger than the key type")
             }
+            ArrowError::RunEndIndexOverflowError => {
+                write!(f, "Run end encoded array index overflow error")
+            }
         }
     }
 }
diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs
index a3275dcb3..dc3ab3d62 100644
--- a/arrow-schema/src/field.rs
+++ b/arrow-schema/src/field.rs
@@ -410,6 +410,7 @@ impl Field {
             | DataType::List(_)
             | DataType::Map(_, _)
             | DataType::Dictionary(_, _)
+            | DataType::RunEndEncoded(_, _)
             | DataType::FixedSizeList(_, _)
             | DataType::FixedSizeBinary(_)
             | DataType::Utf8
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index 311981593..c459d40d7 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -360,7 +360,7 @@ fn write_leaves<W: Write>(
         ArrowDataType::Float16 => Err(ParquetError::ArrowError(
             "Float16 arrays not supported".to_string(),
         )),
-        ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) => {
+        ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) | ArrowDataType::RunEndEncoded(_, _) => {
             Err(ParquetError::NYI(
                 format!(
                     "Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 2ca4b7ef8..d81d6a69b 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -507,6 +507,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             let dict_field = Field::new(name, *value.clone(), field.is_nullable());
             arrow_to_parquet_type(&dict_field)
         }
+        DataType::RunEndEncoded(_, _) => Err(arrow_err!("Converting RunEndEncodedType to parquet not supported",))
     }
 }