You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/19 19:31:21 UTC

[GitHub] [arrow-rs] alamb commented on a change in pull request #1180: Preserve dictionary encoding when decoding parquet into Arrow arrays (#171)

alamb commented on a change in pull request #1180:
URL: https://github.com/apache/arrow-rs/pull/1180#discussion_r788028959



##########
File path: parquet/src/arrow/arrow_reader.rs
##########
@@ -470,9 +454,78 @@ mod tests {
             2,
             ConvertedType::UTF8,
             Some(ArrowDataType::LargeUtf8),
-            &converter,
+            &large_utf8_converter,
             encodings,
         );
+
+        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
+        for key in &small_key_types {
+            for encoding in encodings {
+                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
+                opts.encoding = *encoding;
+
+                // Cannot run full test suite as keys overflow, run small test instead
+                single_column_reader_test::<
+                    ByteArrayType,
+                    StringArray,
+                    Utf8ArrayConverter,
+                    RandUtf8Gen,
+                >(
+                    opts,
+                    2,
+                    ConvertedType::UTF8,
+                    Some(ArrowDataType::Dictionary(
+                        Box::new(key.clone()),
+                        Box::new(ArrowDataType::Utf8),
+                    )),
+                    &utf8_converter,
+                );
+            }
+        }
+
+        let key_types = [
+            ArrowDataType::Int16,
+            ArrowDataType::UInt16,
+            ArrowDataType::Int32,
+            ArrowDataType::UInt32,
+            ArrowDataType::Int64,
+            ArrowDataType::UInt64,
+        ];
+
+        for key in &key_types {
+            run_single_column_reader_tests::<
+                ByteArrayType,
+                StringArray,
+                Utf8ArrayConverter,
+                RandUtf8Gen,
+            >(
+                2,
+                ConvertedType::UTF8,
+                Some(ArrowDataType::Dictionary(
+                    Box::new(key.clone()),
+                    Box::new(ArrowDataType::Utf8),
+                )),
+                &utf8_converter,
+                encodings,
+            );
+
+            // https://github.com/apache/arrow-rs/issues/1179

Review comment:
       👍 

##########
File path: parquet/src/arrow/arrow_reader.rs
##########
@@ -470,9 +454,78 @@ mod tests {
             2,
             ConvertedType::UTF8,
             Some(ArrowDataType::LargeUtf8),
-            &converter,
+            &large_utf8_converter,
             encodings,
         );
+
+        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
+        for key in &small_key_types {
+            for encoding in encodings {
+                let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
+                opts.encoding = *encoding;
+
+                // Cannot run full test suite as keys overflow, run small test instead
+                single_column_reader_test::<
+                    ByteArrayType,
+                    StringArray,
+                    Utf8ArrayConverter,
+                    RandUtf8Gen,
+                >(
+                    opts,
+                    2,
+                    ConvertedType::UTF8,
+                    Some(ArrowDataType::Dictionary(
+                        Box::new(key.clone()),
+                        Box::new(ArrowDataType::Utf8),
+                    )),
+                    &utf8_converter,
+                );
+            }
+        }
+
+        let key_types = [
+            ArrowDataType::Int16,
+            ArrowDataType::UInt16,
+            ArrowDataType::Int32,
+            ArrowDataType::UInt32,
+            ArrowDataType::Int64,
+            ArrowDataType::UInt64,
+        ];
+
+        for key in &key_types {

Review comment:
       this is becoming a somewhat epic single test -- I wonder if it would be valuable to split it into smaller tests that can run in parallel (as a future PR)

##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -81,9 +80,15 @@ use crate::schema::types::{
 use crate::schema::visitor::TypeVisitor;
 
 mod byte_array;
+mod byte_array_dictionary;
+mod dictionary_buffer;
 mod offset_buffer;
 
+#[cfg(test)]

Review comment:
       👍 

##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {

Review comment:
       this is a clever abstraction

##########
File path: parquet/src/arrow/array_reader/byte_array_dictionary.rs
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::marker::PhantomData;
+use std::ops::Range;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
+use crate::arrow::array_reader::{
+    byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
+    offset_buffer::OffsetBuffer,
+};
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
+use crate::util::memory::ByteBufferPtr;
+
+/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
+macro_rules! make_reader {
+    (
+        ($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) {
+            $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
+        }
+    ) => {
+        match (($k, $v)) {
+            $(
+                ($key_arrow, $value_arrow) => {
+                    let reader = GenericRecordReader::new_with_options(
+                        $column_desc,
+                        $null_mask_only,
+                    );
+                    Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
+                        $pages, $data_type, reader,
+                    )))
+                }
+            )+
+            _ => Err(general_err!(
+                "unsupported data type for byte array dictionary reader - {}",
+                $data_type
+            )),
+        }
+    }
+}
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// This will attempt to preserve any dictionary encoding present in the parquet data
+///
+/// It will be unable to preserve the dictionary encoding if:
+///
+/// * A single read spans across multiple column chunks
+/// * A column chunk contains non-dictionary encoded pages
+///
+/// It is therefore recommended that if `pages` contains data from multiple column chunks,
+/// that the batch size used is a divisor of the row group size
+///
+pub fn make_byte_array_dictionary_reader(
+    pages: Box<dyn PageIterator>,
+    column_desc: ColumnDescPtr,
+    arrow_type: Option<ArrowType>,
+    null_mask_only: bool,
+) -> Result<Box<dyn ArrayReader>> {
+    // Check if Arrow type is specified, else create it from Parquet type
+    let data_type = match arrow_type {
+        Some(t) => t,
+        None => parquet_to_arrow_field(column_desc.as_ref())?
+            .data_type()
+            .clone(),
+    };
+
+    match &data_type {
+        ArrowType::Dictionary(key_type, value_type) => {
+            make_reader! {
+                (pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) {
+                    (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32),
+                    (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
+                    (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => (i8, i32),
+                    (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64),
+                    (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8) => (u16, i32),
+                    (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64),
+                    (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) => (i16, i32),
+                    (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64),
+                    (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8) => (u32, i32),
+                    (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64),
+                    (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) => (i32, i32),
+                    (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64),
+                    (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8) => (u64, i32),
+                    (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64),
+                    (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) => (i64, i32),
+                    (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64),
+                }
+            }
+        }
+        _ => Err(general_err!(
+            "invalid non-dictionary data type for byte array dictionary reader - {}",
+            data_type
+        )),
+    }
+}
+
+/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
+///
+/// Will attempt to preserve any dictionary encoding present in the parquet data
+struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
+}
+
+impl<K, V> ByteArrayDictionaryReader<K, V>
+where
+    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+    V: ScalarValue + OffsetSizeTrait,
+{
+    fn new(
+        pages: Box<dyn PageIterator>,
+        data_type: ArrowType,
+        record_reader: GenericRecordReader<
+            DictionaryBuffer<K, V>,
+            DictionaryDecoder<K, V>,
+        >,
+    ) -> Self {
+        Self {
+            data_type,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            record_reader,
+        }
+    }
+}
+
+impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
+where
+    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+    V: ScalarValue + OffsetSizeTrait,
+{
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
+        let buffer = self.record_reader.consume_record_data()?;
+        let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+        let array = buffer.into_array(null_buffer, &self.data_type)?;
+
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.record_reader.reset();
+
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+}
+
+/// If the data is dictionary encoded decode the key data directly, so that the dictionary
+/// encoding can be preserved. Otherwise fallback to decoding using [`ByteArrayDecoder`]
+/// and compute a fresh dictionary in [`ByteArrayDictionaryReader::next_batch`]
+enum MaybeDictionaryDecoder {
+    Dict {
+        decoder: RleDecoder,
+        /// This is a maximum as the null count is not always known, e.g. value data from
+        /// a v1 data page
+        max_remaining_values: usize,
+    },
+    Fallback(ByteArrayDecoder),
+}
+
+/// A [`ColumnValueDecoder`] for dictionary encoded variable length byte arrays
+struct DictionaryDecoder<K, V> {
+    /// The current dictionary
+    dict: Option<Arc<ArrayData>>,

Review comment:
       Would it be possible to use 
   
   ```suggestion
       dict: Option<Arc<Array>>,
   ```
   
   ?
   

##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: Arc<ArrayData>,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(
+        &mut self,
+        dictionary: &Arc<ArrayData>,
+    ) -> Option<&mut ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                if Arc::ptr_eq(values, dictionary) {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_offsets = unsafe { values.buffers()[0].typed_data::<V>() };
+                let dict_values = &values.buffers()[1].as_slice();
+
+                spilled.extend_from_dictionary(
+                    keys.as_slice(),
+                    dict_offsets,
+                    dict_values,
+                )?;
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                let min = K::from_usize(0).unwrap();
+                let max = K::from_usize(values.len()).unwrap();
+
+                // It may be possible to use SIMD here
+                if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                    return Err(general_err!(
+                        "dictionary key beyond bounds of dictionary: 0..{}",
+                        values.len()
+                    ));
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.as_ref().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {

Review comment:
       I like this defensive style of validating data in debug builds though skipping the expensive checks during runtime

##########
File path: parquet/src/arrow/array_reader/byte_array_dictionary.rs
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::marker::PhantomData;
+use std::ops::Range;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
+use crate::arrow::array_reader::{
+    byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
+    offset_buffer::OffsetBuffer,
+};
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
+use crate::util::memory::ByteBufferPtr;
+
+/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
+macro_rules! make_reader {
+    (
+        ($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) {
+            $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
+        }
+    ) => {
+        match (($k, $v)) {
+            $(
+                ($key_arrow, $value_arrow) => {
+                    let reader = GenericRecordReader::new_with_options(
+                        $column_desc,
+                        $null_mask_only,
+                    );
+                    Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
+                        $pages, $data_type, reader,
+                    )))
+                }
+            )+
+            _ => Err(general_err!(
+                "unsupported data type for byte array dictionary reader - {}",
+                $data_type
+            )),
+        }
+    }
+}
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// This will attempt to preserve any dictionary encoding present in the parquet data
+///
+/// It will be unable to preserve the dictionary encoding if:
+///
+/// * A single read spans across multiple column chunks
+/// * A column chunk contains non-dictionary encoded pages
+///
+/// It is therefore recommended that if `pages` contains data from multiple column chunks,
+/// that the batch size used is a divisor of the row group size

Review comment:
       ```suggestion
   /// that the read batch size used is a divisor of the row group size
   ```

##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: Arc<ArrayData>,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(
+        &mut self,
+        dictionary: &Arc<ArrayData>,
+    ) -> Option<&mut ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                if Arc::ptr_eq(values, dictionary) {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_offsets = unsafe { values.buffers()[0].typed_data::<V>() };
+                let dict_values = &values.buffers()[1].as_slice();
+
+                spilled.extend_from_dictionary(
+                    keys.as_slice(),
+                    dict_offsets,
+                    dict_values,
+                )?;
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                let min = K::from_usize(0).unwrap();
+                let max = K::from_usize(values.len()).unwrap();
+
+                // It may be possible to use SIMD here
+                if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                    return Err(general_err!(
+                        "dictionary key beyond bounds of dictionary: 0..{}",
+                        values.len()
+                    ));
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.as_ref().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {
+                    true => builder.build().unwrap(),
+                    false => unsafe { builder.build_unchecked() },
+                };
+
+                Ok(make_array(data))
+            }
+            Self::Values { values } => {
+                let value_type = match data_type {
+                    ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+                    _ => unreachable!(),
+                };
+
+                // This will compute a new dictionary
+                let array = arrow::compute::cast(
+                    &values.into_array(null_buffer, value_type),
+                    data_type,
+                )
+                .expect("cast should be infallible");
+
+                Ok(array)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, V> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+    for DictionaryBuffer<K, V>
+{
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        match self {
+            Self::Dict { keys, .. } => {
+                keys.resize(read_offset + levels_read);
+                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+            Self::Values { values, .. } => {
+                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+    for DictionaryBuffer<K, V>
+{
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        match self {
+            Self::Dict { keys, values } => Self::Dict {
+                keys: keys.take(len),
+                values: values.clone(),
+            },
+            Self::Values { values } => Self::Values {
+                values: values.split_off(len),
+            },
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        match self {
+            Self::Dict { keys, .. } => keys.set_len(len),
+            Self::Values { values } => values.set_len(len),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    #[test]
+    fn test_dictionary_buffer() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
+
+        let d1 = Arc::new(
+            StringArray::from(vec!["hello", "world", "", "a", "b"])
+                .data()
+                .clone(),
+        );
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+        // Read some data preserving the dictionary
+        let values = &[1, 0, 3, 2, 4];
+        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+        let mut valid = vec![false, false, true, true, false, true, true, true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
+
+        // Split off some data
+
+        let split = buffer.split_off(4);
+        let null_buffer = Buffer::from_iter(valid.drain(0..4));
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![None, None, Some("world"), Some("hello")]
+        );
+
+        // Read some data not preserving the dictionary
+
+        let values = buffer.spill_values().unwrap();
+        let read_offset = values.len();
+        values.try_push("bingo".as_bytes(), false).unwrap();
+        values.try_push("bongo".as_bytes(), false).unwrap();
+
+        valid.extend_from_slice(&[false, false, true, false, true]);
+        let null_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
+
+        assert_eq!(buffer.len(), 9);
+        let split = buffer.split_off(9);
+
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![
+                None,
+                Some("a"),
+                Some(""),
+                Some("b"),
+                None,
+                None,
+                Some("bingo"),
+                None,
+                Some("bongo")
+            ]
+        );
+
+        // Can recreate with new dictionary as values is empty
+        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d2 = Arc::new(StringArray::from(vec!["bingo", ""]).data().clone());
+        buffer
+            .as_keys(&d2)
+            .unwrap()
+            .extend_from_slice(&[0, 1, 0, 1]);
+
+        let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
+        );
+
+        // Can recreate with new dictionary as keys empty
+        assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d3 = Arc::new(StringArray::from(vec!["bongo"]).data().clone());
+        buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
+
+        // Cannot change dictionary as keys not empty
+        let d4 = Arc::new(StringArray::from(vec!["bananas"]).data().clone());
+        assert!(buffer.as_keys(&d4).is_none());
+    }
+
+    #[test]
+    fn test_validates_keys() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let d = Arc::new(StringArray::from(vec!["", "f"]).data().clone());
+        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
+
+        let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
+        assert!(
+            err.contains("dictionary key beyond bounds of dictionary: 0..2"),

Review comment:
       👍 

##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: Arc<ArrayData>,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(
+        &mut self,
+        dictionary: &Arc<ArrayData>,
+    ) -> Option<&mut ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                if Arc::ptr_eq(values, dictionary) {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_offsets = unsafe { values.buffers()[0].typed_data::<V>() };
+                let dict_values = &values.buffers()[1].as_slice();
+
+                spilled.extend_from_dictionary(
+                    keys.as_slice(),
+                    dict_offsets,
+                    dict_values,
+                )?;
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                let min = K::from_usize(0).unwrap();
+                let max = K::from_usize(values.len()).unwrap();
+
+                // It may be possible to use SIMD here
+                if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                    return Err(general_err!(
+                        "dictionary key beyond bounds of dictionary: 0..{}",
+                        values.len()
+                    ));
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.as_ref().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {
+                    true => builder.build().unwrap(),
+                    false => unsafe { builder.build_unchecked() },
+                };
+
+                Ok(make_array(data))
+            }
+            Self::Values { values } => {
+                let value_type = match data_type {
+                    ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+                    _ => unreachable!(),
+                };
+
+                // This will compute a new dictionary
+                let array = arrow::compute::cast(
+                    &values.into_array(null_buffer, value_type),
+                    data_type,
+                )
+                .expect("cast should be infallible");
+
+                Ok(array)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, V> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+    for DictionaryBuffer<K, V>
+{
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        match self {
+            Self::Dict { keys, .. } => {
+                keys.resize(read_offset + levels_read);
+                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+            Self::Values { values, .. } => {
+                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+    for DictionaryBuffer<K, V>
+{
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        match self {
+            Self::Dict { keys, values } => Self::Dict {
+                keys: keys.take(len),
+                values: values.clone(),
+            },
+            Self::Values { values } => Self::Values {
+                values: values.split_off(len),
+            },
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        match self {
+            Self::Dict { keys, .. } => keys.set_len(len),
+            Self::Values { values } => values.set_len(len),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    #[test]
+    fn test_dictionary_buffer() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
+
+        let d1 = Arc::new(
+            StringArray::from(vec!["hello", "world", "", "a", "b"])
+                .data()
+                .clone(),
+        );
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+        // Read some data preserving the dictionary
+        let values = &[1, 0, 3, 2, 4];
+        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+        let mut valid = vec![false, false, true, true, false, true, true, true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
+
+        // Split off some data
+
+        let split = buffer.split_off(4);
+        let null_buffer = Buffer::from_iter(valid.drain(0..4));
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![None, None, Some("world"), Some("hello")]

Review comment:
       Since this was splitting off the first 4 elements (indicies `1, 0, 3, 2` and valid `f,f,t,t`) I would have expected the array produced to be `None, None, Some("a"), Some("")` so I clearly misunderstand something about how this code works

##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: Arc<ArrayData>,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(
+        &mut self,
+        dictionary: &Arc<ArrayData>,
+    ) -> Option<&mut ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                if Arc::ptr_eq(values, dictionary) {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_offsets = unsafe { values.buffers()[0].typed_data::<V>() };
+                let dict_values = &values.buffers()[1].as_slice();
+
+                spilled.extend_from_dictionary(
+                    keys.as_slice(),
+                    dict_offsets,
+                    dict_values,
+                )?;
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                let min = K::from_usize(0).unwrap();
+                let max = K::from_usize(values.len()).unwrap();
+
+                // It may be possible to use SIMD here
+                if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                    return Err(general_err!(
+                        "dictionary key beyond bounds of dictionary: 0..{}",
+                        values.len()
+                    ));
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.as_ref().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {
+                    true => builder.build().unwrap(),
+                    false => unsafe { builder.build_unchecked() },
+                };
+
+                Ok(make_array(data))
+            }
+            Self::Values { values } => {
+                let value_type = match data_type {
+                    ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+                    _ => unreachable!(),
+                };
+
+                // This will compute a new dictionary
+                let array = arrow::compute::cast(
+                    &values.into_array(null_buffer, value_type),
+                    data_type,
+                )
+                .expect("cast should be infallible");
+
+                Ok(array)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, V> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+    for DictionaryBuffer<K, V>
+{
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        match self {
+            Self::Dict { keys, .. } => {
+                keys.resize(read_offset + levels_read);
+                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+            Self::Values { values, .. } => {
+                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+    for DictionaryBuffer<K, V>
+{
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        match self {
+            Self::Dict { keys, values } => Self::Dict {
+                keys: keys.take(len),
+                values: values.clone(),
+            },
+            Self::Values { values } => Self::Values {
+                values: values.split_off(len),
+            },
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        match self {
+            Self::Dict { keys, .. } => keys.set_len(len),
+            Self::Values { values } => values.set_len(len),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    #[test]
+    fn test_dictionary_buffer() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
+
+        let d1 = Arc::new(
+            StringArray::from(vec!["hello", "world", "", "a", "b"])
+                .data()
+                .clone(),
+        );
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+        // Read some data preserving the dictionary
+        let values = &[1, 0, 3, 2, 4];
+        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+        let mut valid = vec![false, false, true, true, false, true, true, true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
+
+        // Split off some data
+
+        let split = buffer.split_off(4);
+        let null_buffer = Buffer::from_iter(valid.drain(0..4));
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![None, None, Some("world"), Some("hello")]
+        );
+
+        // Read some data not preserving the dictionary
+
+        let values = buffer.spill_values().unwrap();
+        let read_offset = values.len();
+        values.try_push("bingo".as_bytes(), false).unwrap();
+        values.try_push("bongo".as_bytes(), false).unwrap();
+
+        valid.extend_from_slice(&[false, false, true, false, true]);
+        let null_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
+
+        assert_eq!(buffer.len(), 9);
+        let split = buffer.split_off(9);
+
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![
+                None,
+                Some("a"),
+                Some(""),
+                Some("b"),
+                None,
+                None,
+                Some("bingo"),
+                None,
+                Some("bongo")
+            ]
+        );
+
+        // Can recreate with new dictionary as values is empty
+        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d2 = Arc::new(StringArray::from(vec!["bingo", ""]).data().clone());
+        buffer
+            .as_keys(&d2)
+            .unwrap()
+            .extend_from_slice(&[0, 1, 0, 1]);
+
+        let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
+        );
+
+        // Can recreate with new dictionary as keys empty
+        assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d3 = Arc::new(StringArray::from(vec!["bongo"]).data().clone());
+        buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
+
+        // Cannot change dictionary as keys not empty
+        let d4 = Arc::new(StringArray::from(vec!["bananas"]).data().clone());
+        assert!(buffer.as_keys(&d4).is_none());
+    }
+
+    #[test]
+    fn test_validates_keys() {

Review comment:
       Note to other reviewers that there is a test for decoding a dictionary whose keys are too large (e.g. an index of `500` when the keys type is `Int8`) in `byte_array_dictionary.rs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org