You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/19 21:33:26 UTC

[GitHub] [arrow-rs] tustvold commented on a change in pull request #1180: Preserve dictionary encoding when decoding parquet into Arrow arrays (#171)

tustvold commented on a change in pull request #1180:
URL: https://github.com/apache/arrow-rs/pull/1180#discussion_r788157513



##########
File path: parquet/src/arrow/array_reader/dictionary_buffer.rs
##########
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: Arc<ArrayData>,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(
+        &mut self,
+        dictionary: &Arc<ArrayData>,
+    ) -> Option<&mut ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                if Arc::ptr_eq(values, dictionary) {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_offsets = unsafe { values.buffers()[0].typed_data::<V>() };
+                let dict_values = &values.buffers()[1].as_slice();
+
+                spilled.extend_from_dictionary(
+                    keys.as_slice(),
+                    dict_offsets,
+                    dict_values,
+                )?;
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                let min = K::from_usize(0).unwrap();
+                let max = K::from_usize(values.len()).unwrap();
+
+                // It may be possible to use SIMD here
+                if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                    return Err(general_err!(
+                        "dictionary key beyond bounds of dictionary: 0..{}",
+                        values.len()
+                    ));
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.as_ref().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {
+                    true => builder.build().unwrap(),
+                    false => unsafe { builder.build_unchecked() },
+                };
+
+                Ok(make_array(data))
+            }
+            Self::Values { values } => {
+                let value_type = match data_type {
+                    ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+                    _ => unreachable!(),
+                };
+
+                // This will compute a new dictionary
+                let array = arrow::compute::cast(
+                    &values.into_array(null_buffer, value_type),
+                    data_type,
+                )
+                .expect("cast should be infallible");
+
+                Ok(array)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, V> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+    for DictionaryBuffer<K, V>
+{
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        match self {
+            Self::Dict { keys, .. } => {
+                keys.resize(read_offset + levels_read);
+                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+            Self::Values { values, .. } => {
+                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+    for DictionaryBuffer<K, V>
+{
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        match self {
+            Self::Dict { keys, values } => Self::Dict {
+                keys: keys.take(len),
+                values: values.clone(),
+            },
+            Self::Values { values } => Self::Values {
+                values: values.split_off(len),
+            },
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        match self {
+            Self::Dict { keys, .. } => keys.set_len(len),
+            Self::Values { values } => values.set_len(len),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    #[test]
+    fn test_dictionary_buffer() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
+
+        let d1 = Arc::new(
+            StringArray::from(vec!["hello", "world", "", "a", "b"])
+                .data()
+                .clone(),
+        );
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+        // Read some data preserving the dictionary
+        let values = &[1, 0, 3, 2, 4];
+        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+        let mut valid = vec![false, false, true, true, false, true, true, true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
+
+        // Split off some data
+
+        let split = buffer.split_off(4);
+        let null_buffer = Buffer::from_iter(valid.drain(0..4));
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![None, None, Some("world"), Some("hello")]

Review comment:
       It reads 4 non-null values
   
   ```
   [Some("world"), Some("hello"), Some("a"), Some(""), Some("b")]
   ```
   
   It then pads them out based on the null mask
   
   ```
   [None, None, Some("world"), Some("hello"), None, Some("a"), Some(""), Some("b")]
   ```
   
   It then splits off the first four
   
   ```
   [None, None, Some("world"), Some("hello")]
   ```
   
   Leaving behind in the buffer
   
   ```
   [None, Some("a"), Some(""), Some("b")]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org