You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/02/28 07:11:48 UTC

[arrow] branch master updated: ARROW-5949: [Rust] Implement Dictionary Array

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a7d2d  ARROW-5949: [Rust] Implement Dictionary Array
c7a7d2d is described below

commit c7a7d2dcc46ed06593b994cb54c5eaf9ccd1d21d
Author: andy-thomason <an...@genomicsplc.com>
AuthorDate: Fri Feb 28 09:11:23 2020 +0200

    ARROW-5949: [Rust] Implement Dictionary Array
    
    @nevi-me and @andygrove
    
    I've been working on the structure of the dictionary array getting the structure back into the spirit of the original design. The DictionaryArray is now typed by the key native type and could be typed by the value array type as otherwise, we can't implement value() in any meaningful way.
    
    I've extended ArrowNativeType for use as an index (to/from usize) and added a value iterator which simplifies iteration over arrays of all kinds.
    
    Closes #6095 from andy-thomason/ARROW-5949 and squashes the following commits:
    
    aebc6258f <andy-thomason> Format the code.
    2365cb306 <andy-thomason> Fix Andy G's issues - thanks.
    b23f3624c <andy-thomason> Hack src/flight to pass tests.
    f212bcfbe <andy-thomason> Rebase to current master.
    0f1bbb3f8 <andy-thomason> Fix merge error.
    adfa037e3 <andy-thomason> Format code.
    03895e64d <andy-thomason> Add ArrowDictionaryKeyType subtype.
    f36cccca8 <andy-thomason> Remove option for nulls in values.
    2351677e4 <andy-thomason> Change unwraps to expects
    d9da53429 <andy-thomason> Format code!
    5c40b5f40 <andy-thomason> Add FromIterator<&str/Option<&str>> to DictionaryArray.
    5f9e1fc5c <andy-thomason> Add StringDictionaryBuilder with null-zero schema.
    b970c79d6 <andy-thomason> Format the code.
    80a05f0fc <andy-thomason> Rebase latest.
    36fbd6b67 <andy-thomason> Remove keys slice and replace with iterator.
    be0041b74 <andy-thomason> Fix comment typo.
    5c975ab90 <andy-thomason> Fix code formatting.
    29b43bc5d <andy-thomason> Fix and test Debug for DictionaryArray
    02bc6d36e <andy-thomason> Fix key issue with primitive dictionary builder.
    69d678180 <andy-thomason> Remove tuple from Dictionary datatype.
    1e0e84923 <andy-thomason> Add missing case to Display for ArrowError
    4e54ef305 <Renjie Liu> ARROW-7312:  Implement std::error::Error for ArrowError.
    1a95bac81 <Neville Dipale> add dictionary_by_field to stream reader
    9d8f29045 <andy-thomason> Change to original layout and add primitive builder.
    bd27cfbce <andy-thomason> Format
    48689be04 <andy-thomason> Specialise the Dictionary Array by key type.
    77f8a33f5 <andy-thomason> Unreachable in create_dictionary_array.
    044aea677 <andy-thomason> Comment change.
    c9d6dd539 <Andy Thomson> Update rust/arrow/src/array/array.rs
    c19969c6d <Andy Thomson> Update rust/arrow/src/array/array.rs
    f7213bb77 <Andy Thomson> Update rust/arrow/src/array/array.rs
    f3416e0fc <Andy Thomson> Update rust/arrow/src/util/integration_util.rs
    1fbd7cc44 <Andy Thomson> Update rust/arrow/src/ipc/file/reader.rs
    497e16809 <andy-thomason> fix schema test
    ee278d55b <andy-thomason> Rebase master
    
    Lead-authored-by: andy-thomason <an...@genomicsplc.com>
    Co-authored-by: Andy Thomson <an...@andythomason.com>
    Co-authored-by: Neville Dipale <ne...@gmail.com>
    Co-authored-by: Renjie Liu <li...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/arrow/src/array/array.rs           | 364 +++++++++++++++++++++++++++++++-
 rust/arrow/src/array/builder.rs         | 277 ++++++++++++++++++++++++
 rust/arrow/src/array/equal.rs           |  56 +++++
 rust/arrow/src/array/mod.rs             |  11 +
 rust/arrow/src/datatypes.rs             | 209 ++++++++++++++++--
 rust/arrow/src/error.rs                 |   4 +
 rust/arrow/src/flight/mod.rs            |   8 +-
 rust/arrow/src/ipc/convert.rs           |  51 ++++-
 rust/arrow/src/ipc/reader.rs            | 199 +++++++++++++++--
 rust/arrow/src/util/integration_util.rs |  84 ++++++++
 10 files changed, 1220 insertions(+), 43 deletions(-)

diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs
index c135fd2..05620e3 100644
--- a/rust/arrow/src/array/array.rs
+++ b/rust/arrow/src/array/array.rs
@@ -19,12 +19,14 @@ use std::any::Any;
 use std::convert::{From, TryFrom};
 use std::fmt;
 use std::io::Write;
+use std::iter::{FromIterator, IntoIterator};
 use std::mem;
 use std::sync::Arc;
 
 use chrono::prelude::*;
 
 use super::*;
+use crate::array::builder::StringDictionaryBuilder;
 use crate::array::equal::JsonEqual;
 use crate::buffer::{Buffer, MutableBuffer};
 use crate::datatypes::DataType::Struct;
@@ -164,6 +166,33 @@ pub fn make_array(data: ArrayDataRef) -> ArrayRef {
         DataType::FixedSizeList(_, _) => {
             Arc::new(FixedSizeListArray::from(data)) as ArrayRef
         }
+        DataType::Dictionary(ref key_type, _) => match key_type.as_ref() {
+            DataType::Int8 => {
+                Arc::new(DictionaryArray::<Int8Type>::from(data)) as ArrayRef
+            }
+            DataType::Int16 => {
+                Arc::new(DictionaryArray::<Int16Type>::from(data)) as ArrayRef
+            }
+            DataType::Int32 => {
+                Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef
+            }
+            DataType::Int64 => {
+                Arc::new(DictionaryArray::<Int64Type>::from(data)) as ArrayRef
+            }
+            DataType::UInt8 => {
+                Arc::new(DictionaryArray::<UInt8Type>::from(data)) as ArrayRef
+            }
+            DataType::UInt16 => {
+                Arc::new(DictionaryArray::<UInt16Type>::from(data)) as ArrayRef
+            }
+            DataType::UInt32 => {
+                Arc::new(DictionaryArray::<UInt32Type>::from(data)) as ArrayRef
+            }
+            DataType::UInt64 => {
+                Arc::new(DictionaryArray::<UInt64Type>::from(data)) as ArrayRef
+            }
+            dt => panic!("Unexpected dictionary key type {:?}", dt),
+        },
         dt => panic!("Unexpected data type {:?}", dt),
     }
 }
@@ -763,7 +792,8 @@ impl<T: ArrowPrimitiveType> From<ArrayDataRef> for PrimitiveArray<T> {
     }
 }
 
-/// Common operations for List types, currently `ListArray`, `FixedSizeListArray` and `BinaryArray`.
+/// Common operations for List types, currently `ListArray`, `FixedSizeListArray`, `BinaryArray`
+/// `StringArray` and `DictionaryArray`
 pub trait ListArrayOps {
     fn value_offset_at(&self, i: usize) -> i32;
 }
@@ -1638,6 +1668,229 @@ impl From<(Vec<(Field, ArrayRef)>, Buffer, usize)> for StructArray {
     }
 }
 
+/// A dictonary array where each element is a single value indexed by an integer key.
+/// This is mostly used to represent strings or a limited set of primitive types as integers,
+/// for example when doing NLP analysis or representing chromosomes by name.
+///
+/// Example with nullable data:
+///
+/// ```
+///     use arrow::array::DictionaryArray;
+///     use arrow::datatypes::Int8Type;
+///     let test = vec!["a", "a", "b", "c"];
+///     let array : DictionaryArray<Int8Type> = test.iter().map(|&x| if x == "b" {None} else {Some(x)}).collect();
+///     assert_eq!(array.keys().collect::<Vec<Option<i8>>>(), vec![Some(0), Some(0), None, Some(1)]);
+/// ```
+///
+/// Example without nullable data:
+///
+/// ```
+///
+///     use arrow::array::DictionaryArray;
+///     use arrow::datatypes::Int8Type;
+///     let test = vec!["a", "a", "b", "c"];
+///     let array : DictionaryArray<Int8Type> = test.into_iter().collect();
+///     assert_eq!(array.keys().collect::<Vec<Option<i8>>>(), vec![Some(0), Some(0), Some(1), Some(2)]);
+/// ```
+pub struct DictionaryArray<K: ArrowPrimitiveType> {
+    // Array of keys, much like a PrimitiveArray
+    data: ArrayDataRef,
+
+    // Pointer to the key values.
+    raw_values: RawPtrBox<K::Native>,
+
+    // Array of any values.
+    values: ArrayRef,
+
+    /// Values are ordered.
+    is_ordered: bool,
+}
+
+pub struct NullableIter<'a, T> {
+    data: &'a ArrayDataRef, // TODO: Use a pointer to the null bitmap.
+    ptr: *const T,
+    i: usize,
+    len: usize,
+}
+
+impl<'a, T> std::iter::Iterator for NullableIter<'a, T>
+where
+    T: Clone,
+{
+    type Item = Option<T>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let i = self.i;
+        if i >= self.len {
+            None
+        } else if self.data.is_null(i) {
+            self.i += 1;
+            Some(None)
+        } else {
+            self.i += 1;
+            unsafe { Some(Some((&*self.ptr.offset(i as isize)).clone())) }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (self.len, Some(self.len))
+    }
+
+    fn nth(&mut self, n: usize) -> Option<Self::Item> {
+        let i = self.i;
+        if i + n >= self.len {
+            self.i = self.len;
+            None
+        } else if self.data.is_null(i + n) {
+            self.i += n + 1;
+            Some(None)
+        } else {
+            self.i += n + 1;
+            unsafe { Some(Some((&*self.ptr.offset((i + n) as isize)).clone())) }
+        }
+    }
+}
+
+impl<'a, K: ArrowPrimitiveType> DictionaryArray<K> {
+    /// Return an iterator to the keys of this dictionary.
+    pub fn keys(&'a self) -> NullableIter<'a, K::Native> {
+        NullableIter::<'a, K::Native> {
+            data: &self.data,
+            ptr: unsafe { self.raw_values.get().offset(self.data.offset() as isize) },
+            i: 0,
+            len: self.data.len(),
+        }
+    }
+
+    /// Returns an `ArrayRef` to the dictionary values.
+    pub fn values(&self) -> ArrayRef {
+        self.values.clone()
+    }
+
+    /// Returns a clone of the value type of this list.
+    pub fn value_type(&self) -> DataType {
+        self.values.data().data_type().clone()
+    }
+
+    /// The length of the dictionary is the length of the keys array.
+    pub fn len(&self) -> usize {
+        self.data.len()
+    }
+
+    pub fn is_ordered(&self) -> bool {
+        self.is_ordered
+    }
+}
+
+/// Constructs a `DictionaryArray` from an array data reference.
+impl<T: ArrowPrimitiveType> From<ArrayDataRef> for DictionaryArray<T> {
+    fn from(data: ArrayDataRef) -> Self {
+        assert_eq!(
+            data.buffers().len(),
+            1,
+            "DictionaryArray data should contain a single buffer only (keys)."
+        );
+        assert_eq!(
+            data.child_data().len(),
+            1,
+            "DictionaryArray should contain a single child array (values)."
+        );
+
+        let raw_values = data.buffers()[0].raw_data();
+        let dtype: &DataType = data.data_type();
+        let values = make_array(data.child_data()[0].clone());
+        if let DataType::Dictionary(_, _) = dtype {
+            Self {
+                data: data,
+                raw_values: RawPtrBox::new(raw_values as *const T::Native),
+                values: values,
+                is_ordered: false,
+            }
+        } else {
+            panic!("DictionaryArray must have Dictionary data type.")
+        }
+    }
+}
+
+/// Constructs a `DictionaryArray` from an iterator of optional strings.
+impl<T: ArrowPrimitiveType + ArrowDictionaryKeyType> FromIterator<Option<&'static str>>
+    for DictionaryArray<T>
+{
+    fn from_iter<I: IntoIterator<Item = Option<&'static str>>>(iter: I) -> Self {
+        let iter = iter.into_iter();
+        let (lower, _) = iter.size_hint();
+        let key_builder = PrimitiveBuilder::<T>::new(lower);
+        let value_builder = StringBuilder::new(256);
+        let mut builder = StringDictionaryBuilder::new(key_builder, value_builder);
+        for i in iter {
+            if let Some(i) = i {
+                // Note: impl ... for Result<DictionaryArray<T>> fails with
+                // error[E0117]: only traits defined in the current crate can be implemented for arbitrary types
+                builder
+                    .append(i)
+                    .expect("Unable to append a value to a dictionary array.");
+            } else {
+                builder
+                    .append_null()
+                    .expect("Unable to append a null value to a dictionary array.");
+            }
+        }
+
+        builder.finish()
+    }
+}
+
+/// Constructs a `DictionaryArray` from an iterator of strings.
+impl<T: ArrowPrimitiveType + ArrowDictionaryKeyType> FromIterator<&'static str>
+    for DictionaryArray<T>
+{
+    fn from_iter<I: IntoIterator<Item = &'static str>>(iter: I) -> Self {
+        let iter = iter.into_iter();
+        let (lower, _) = iter.size_hint();
+        let key_builder = PrimitiveBuilder::<T>::new(lower);
+        let value_builder = StringBuilder::new(256);
+        let mut builder = StringDictionaryBuilder::new(key_builder, value_builder);
+        for i in iter {
+            builder
+                .append(i)
+                .expect("Unable to append a value to a dictionary array.");
+        }
+
+        builder.finish()
+    }
+}
+
+impl<T: ArrowPrimitiveType> Array for DictionaryArray<T> {
+    fn as_any(&self) -> &Any {
+        self
+    }
+
+    fn data(&self) -> ArrayDataRef {
+        self.data.clone()
+    }
+
+    fn data_ref(&self) -> &ArrayDataRef {
+        &self.data
+    }
+}
+
+impl<T: ArrowPrimitiveType> fmt::Debug for DictionaryArray<T> {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        const MAX_LEN: usize = 10;
+        let keys: Vec<_> = self.keys().take(MAX_LEN).collect();
+        let elipsis = if self.keys().count() > MAX_LEN {
+            "..."
+        } else {
+            ""
+        };
+        write!(
+            f,
+            "DictionaryArray {{keys: {:?}{} values: {:?}}}\n",
+            keys, elipsis, self.values
+        )
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2122,6 +2375,69 @@ mod tests {
     }
 
     #[test]
+    fn test_dictionary_array() {
+        // Construct a value array
+        let value_data = ArrayData::builder(DataType::Int8)
+            .len(8)
+            .add_buffer(Buffer::from(
+                &[10_i8, 11, 12, 13, 14, 15, 16, 17].to_byte_slice(),
+            ))
+            .build();
+
+        // Construct a buffer for value offsets, for the nested array:
+        let keys = Buffer::from(&[2_i16, 3, 4].to_byte_slice());
+
+        // Construct a dictionary array from the above two
+        let key_type = DataType::Int16;
+        let value_type = DataType::Int8;
+        let dict_data_type =
+            DataType::Dictionary(Box::new(key_type), Box::new(value_type));
+        let dict_data = ArrayData::builder(dict_data_type.clone())
+            .len(3)
+            .add_buffer(keys.clone())
+            .add_child_data(value_data.clone())
+            .build();
+        let dict_array = Int16DictionaryArray::from(dict_data);
+
+        let values = dict_array.values();
+        assert_eq!(value_data, values.data());
+        assert_eq!(DataType::Int8, dict_array.value_type());
+        assert_eq!(3, dict_array.len());
+
+        // Null count only makes sense in terms of the component arrays.
+        assert_eq!(0, dict_array.null_count());
+        assert_eq!(0, dict_array.values().null_count());
+        assert_eq!(Some(Some(3)), dict_array.keys().nth(1));
+        assert_eq!(Some(Some(4)), dict_array.keys().nth(2));
+
+        assert_eq!(
+            dict_array.keys().collect::<Vec<Option<i16>>>(),
+            vec![Some(2), Some(3), Some(4)]
+        );
+
+        // Now test with a non-zero offset
+        let dict_data = ArrayData::builder(dict_data_type)
+            .len(2)
+            .offset(1)
+            .add_buffer(keys)
+            .add_child_data(value_data.clone())
+            .build();
+        let dict_array = Int16DictionaryArray::from(dict_data);
+
+        let values = dict_array.values();
+        assert_eq!(value_data, values.data());
+        assert_eq!(DataType::Int8, dict_array.value_type());
+        assert_eq!(2, dict_array.len());
+        assert_eq!(Some(Some(3)), dict_array.keys().nth(0));
+        assert_eq!(Some(Some(4)), dict_array.keys().nth(1));
+
+        assert_eq!(
+            dict_array.keys().collect::<Vec<Option<i16>>>(),
+            vec![Some(3), Some(4)]
+        );
+    }
+
+    #[test]
     fn test_fixed_size_list_array() {
         // Construct a value array
         let value_data = ArrayData::builder(DataType::Int32)
@@ -2903,4 +3219,50 @@ mod tests {
         assert!(ret.is_ok());
         assert_eq!(8, ret.ok().unwrap());
     }
+
+    #[test]
+    fn test_dictionary_array_fmt_debug() {
+        let key_builder = PrimitiveBuilder::<UInt8Type>::new(3);
+        let value_builder = PrimitiveBuilder::<UInt32Type>::new(2);
+        let mut builder = PrimitiveDictionaryBuilder::new(key_builder, value_builder);
+        builder.append(12345678).unwrap();
+        builder.append_null().unwrap();
+        builder.append(22345678).unwrap();
+        let array = builder.finish();
+        assert_eq!(
+            "DictionaryArray {keys: [Some(0), None, Some(1)] values: PrimitiveArray<UInt32>\n[\n  12345678,\n  22345678,\n]}\n",
+            format!("{:?}", array)
+        );
+
+        let key_builder = PrimitiveBuilder::<UInt8Type>::new(20);
+        let value_builder = PrimitiveBuilder::<UInt32Type>::new(2);
+        let mut builder = PrimitiveDictionaryBuilder::new(key_builder, value_builder);
+        for _ in 0..20 {
+            builder.append(1).unwrap();
+        }
+        let array = builder.finish();
+        assert_eq!(
+            "DictionaryArray {keys: [Some(0), Some(0), Some(0), Some(0), Some(0), Some(0), Some(0), Some(0), Some(0), Some(0)]... values: PrimitiveArray<UInt32>\n[\n  1,\n]}\n",
+            format!("{:?}", array)
+        );
+    }
+
+    #[test]
+    fn test_dictionary_array_from_iter() {
+        let test = vec!["a", "a", "b", "c"];
+        let array: DictionaryArray<Int8Type> = test
+            .iter()
+            .map(|&x| if x == "b" { None } else { Some(x) })
+            .collect();
+        assert_eq!(
+            "DictionaryArray {keys: [Some(0), Some(0), None, Some(1)] values: StringArray\n[\n  \"a\",\n  \"c\",\n]}\n",
+            format!("{:?}", array)
+        );
+
+        let array: DictionaryArray<Int8Type> = test.into_iter().collect();
+        assert_eq!(
+            "DictionaryArray {keys: [Some(0), Some(0), Some(1), Some(2)] values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
+            format!("{:?}", array)
+        );
+    }
 }
diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs
index c0d1044..bd73dbd 100644
--- a/rust/arrow/src/array/builder.rs
+++ b/rust/arrow/src/array/builder.rs
@@ -19,6 +19,7 @@
 //! internal buffer in an `ArrayData` object.
 
 use std::any::Any;
+use std::collections::HashMap;
 use std::io::Write;
 use std::marker::PhantomData;
 use std::mem;
@@ -326,6 +327,27 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
         let data = builder.build();
         PrimitiveArray::<T>::from(data)
     }
+
+    /// Builds the `DictionaryArray` and reset this builder.
+    pub fn finish_dict(&mut self, values: ArrayRef) -> DictionaryArray<T> {
+        let len = self.len();
+        let null_bit_buffer = self.bitmap_builder.finish();
+        let null_count = len - bit_util::count_set_bits(null_bit_buffer.data());
+        let data_type = DataType::Dictionary(
+            Box::new(T::get_data_type()),
+            Box::new(values.data_type().clone()),
+        );
+        let mut builder = ArrayData::builder(data_type)
+            .len(len)
+            .add_buffer(self.values_builder.finish());
+        if null_count > 0 {
+            builder = builder
+                .null_count(null_count)
+                .null_bit_buffer(null_bit_buffer);
+        }
+        builder = builder.add_child_data(values.data());
+        DictionaryArray::<T>::from(builder.build())
+    }
 }
 
 ///  Array builder for `ListArray`
@@ -973,6 +995,195 @@ impl Drop for StructBuilder {
     }
 }
 
+/// Array builder for `DictionaryArray`. For example to map a set of byte indices
+/// to f32 values. Note that the use of a `HashMap` here will not scale to very large
+/// arrays or result in an ordered dictionary.
+pub struct PrimitiveDictionaryBuilder<K, V>
+where
+    K: ArrowPrimitiveType,
+    V: ArrowPrimitiveType,
+{
+    keys_builder: PrimitiveBuilder<K>,
+    values_builder: PrimitiveBuilder<V>,
+    map: HashMap<Box<[u8]>, K::Native>,
+}
+
+impl<K, V> PrimitiveDictionaryBuilder<K, V>
+where
+    K: ArrowPrimitiveType,
+    V: ArrowPrimitiveType,
+{
+    /// Creates a new `PrimitiveDictionaryBuilder` from a keys builder and a value builder.
+    pub fn new(
+        keys_builder: PrimitiveBuilder<K>,
+        values_builder: PrimitiveBuilder<V>,
+    ) -> Self {
+        Self {
+            keys_builder: keys_builder,
+            values_builder: values_builder,
+            map: HashMap::new(),
+        }
+    }
+}
+
+impl<K, V> ArrayBuilder for PrimitiveDictionaryBuilder<K, V>
+where
+    K: ArrowPrimitiveType,
+    V: ArrowPrimitiveType,
+{
+    /// Returns the builder as an non-mutable `Any` reference.
+    fn as_any(&self) -> &Any {
+        self
+    }
+
+    /// Returns the builder as an mutable `Any` reference.
+    fn as_any_mut(&mut self) -> &mut Any {
+        self
+    }
+
+    /// Returns the boxed builder as a box of `Any`.
+    fn into_box_any(self: Box<Self>) -> Box<Any> {
+        self
+    }
+
+    /// Returns the number of array slots in the builder
+    fn len(&self) -> usize {
+        self.keys_builder.len()
+    }
+
+    /// Builds the array and reset this builder.
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+}
+
+impl<K, V> PrimitiveDictionaryBuilder<K, V>
+where
+    K: ArrowPrimitiveType,
+    V: ArrowPrimitiveType,
+{
+    /// Append a primitive value to the array. Return an existing index
+    /// if already present in the values array or a new index if the
+    /// value is appended to the values array.
+    pub fn append(&mut self, value: V::Native) -> Result<K::Native> {
+        if let Some(&key) = self.map.get(value.to_byte_slice()) {
+            // Append existing value.
+            self.keys_builder.append_value(key)?;
+            Ok(key)
+        } else {
+            // Append new value.
+            let key = K::Native::from_usize(self.values_builder.len())
+                .ok_or(ArrowError::DictionaryKeyOverflowError)?;
+            self.values_builder.append_value(value)?;
+            self.keys_builder.append_value(key as K::Native)?;
+            self.map.insert(value.to_byte_slice().into(), key);
+            Ok(key)
+        }
+    }
+
+    pub fn append_null(&mut self) -> Result<()> {
+        self.keys_builder.append_null()
+    }
+
+    /// Builds the `DictionaryArray` and reset this builder.
+    pub fn finish(&mut self) -> DictionaryArray<K> {
+        self.map.clear();
+        let value_ref: ArrayRef = Arc::new(self.values_builder.finish());
+        self.keys_builder.finish_dict(value_ref)
+    }
+}
+
+/// Array builder for `DictionaryArray`. For example to map a set of byte indices
+/// to f32 values. Note that the use of a `HashMap` here will not scale to very large
+/// arrays or result in an ordered dictionary.
+pub struct StringDictionaryBuilder<K>
+where
+    K: ArrowDictionaryKeyType,
+{
+    keys_builder: PrimitiveBuilder<K>,
+    values_builder: StringBuilder,
+    map: HashMap<Box<[u8]>, K::Native>,
+}
+
+impl<K> StringDictionaryBuilder<K>
+where
+    K: ArrowDictionaryKeyType,
+{
+    /// Creates a new `StringDictionaryBuilder` from a keys builder and a value builder.
+    pub fn new(keys_builder: PrimitiveBuilder<K>, values_builder: StringBuilder) -> Self {
+        Self {
+            keys_builder,
+            values_builder,
+            map: HashMap::new(),
+        }
+    }
+}
+
+impl<K> ArrayBuilder for StringDictionaryBuilder<K>
+where
+    K: ArrowDictionaryKeyType,
+{
+    /// Returns the builder as an non-mutable `Any` reference.
+    fn as_any(&self) -> &Any {
+        self
+    }
+
+    /// Returns the builder as an mutable `Any` reference.
+    fn as_any_mut(&mut self) -> &mut Any {
+        self
+    }
+
+    /// Returns the boxed builder as a box of `Any`.
+    fn into_box_any(self: Box<Self>) -> Box<Any> {
+        self
+    }
+
+    /// Returns the number of array slots in the builder
+    fn len(&self) -> usize {
+        self.keys_builder.len()
+    }
+
+    /// Builds the array and reset this builder.
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+}
+
+impl<K> StringDictionaryBuilder<K>
+where
+    K: ArrowDictionaryKeyType,
+{
+    /// Append a primitive value to the array. Return an existing index
+    /// if already present in the values array or a new index if the
+    /// value is appended to the values array.
+    pub fn append(&mut self, value: &str) -> Result<K::Native> {
+        if let Some(&key) = self.map.get(value.as_bytes()) {
+            // Append existing value.
+            self.keys_builder.append_value(key)?;
+            Ok(key)
+        } else {
+            // Append new value.
+            let key = K::Native::from_usize(self.values_builder.len())
+                .ok_or(ArrowError::DictionaryKeyOverflowError)?;
+            self.values_builder.append_value(value)?;
+            self.keys_builder.append_value(key as K::Native)?;
+            self.map.insert(value.as_bytes().into(), key);
+            Ok(key)
+        }
+    }
+
+    pub fn append_null(&mut self) -> Result<()> {
+        self.keys_builder.append_null()
+    }
+
+    /// Builds the `DictionaryArray` and reset this builder.
+    pub fn finish(&mut self) -> DictionaryArray<K> {
+        self.map.clear();
+        let value_ref: ArrayRef = Arc::new(self.values_builder.finish());
+        self.keys_builder.finish_dict(value_ref)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1808,4 +2019,70 @@ mod tests {
         let mut builder = StructBuilder::new(fields, field_builders);
         assert!(builder.field_builder::<BinaryBuilder>(0).is_none());
     }
+
+    #[test]
+    fn test_primitive_dictionary_builder() {
+        let key_builder = PrimitiveBuilder::<UInt8Type>::new(3);
+        let value_builder = PrimitiveBuilder::<UInt32Type>::new(2);
+        let mut builder = PrimitiveDictionaryBuilder::new(key_builder, value_builder);
+        builder.append(12345678).unwrap();
+        builder.append_null().unwrap();
+        builder.append(22345678).unwrap();
+        let array = builder.finish();
+
+        // Keys are strongly typed.
+        let aks: Vec<_> = array.keys().collect();
+
+        // Values are polymorphic and so require a downcast.
+        let av = array.values();
+        let ava: &UInt32Array = av.as_any().downcast_ref::<UInt32Array>().unwrap();
+        let avs: &[u32] = ava.value_slice(0, array.values().len());
+
+        assert_eq!(array.is_null(0), false);
+        assert_eq!(array.is_null(1), true);
+        assert_eq!(array.is_null(2), false);
+
+        assert_eq!(aks, vec![Some(0), None, Some(1)]);
+        assert_eq!(avs, &[12345678, 22345678]);
+    }
+
+    #[test]
+    fn test_string_dictionary_builder() {
+        let key_builder = PrimitiveBuilder::<Int8Type>::new(5);
+        let value_builder = StringBuilder::new(2);
+        let mut builder = StringDictionaryBuilder::new(key_builder, value_builder);
+        builder.append("abc").unwrap();
+        builder.append_null().unwrap();
+        builder.append("def").unwrap();
+        builder.append("def").unwrap();
+        builder.append("abc").unwrap();
+        let array = builder.finish();
+
+        // Keys are strongly typed.
+        let aks: Vec<_> = array.keys().collect();
+
+        // Values are polymorphic and so require a downcast.
+        let av = array.values();
+        let ava: &StringArray = av.as_any().downcast_ref::<StringArray>().unwrap();
+
+        assert_eq!(aks, vec![Some(0), None, Some(1), Some(1), Some(0)]);
+        assert_eq!(ava.value(0), "abc");
+        assert_eq!(ava.value(1), "def");
+    }
+
+    #[test]
+    fn test_primitive_dictionary_overflow() {
+        let key_builder = PrimitiveBuilder::<UInt8Type>::new(257);
+        let value_builder = PrimitiveBuilder::<UInt32Type>::new(257);
+        let mut builder = PrimitiveDictionaryBuilder::new(key_builder, value_builder);
+        // 256 unique keys.
+        for i in 0..256 {
+            builder.append(i + 1000).unwrap();
+        }
+        // Special error if the key overflows (256th entry)
+        assert_eq!(
+            builder.append(1257),
+            Err(ArrowError::DictionaryKeyOverflowError)
+        );
+    }
 }
diff --git a/rust/arrow/src/array/equal.rs b/rust/arrow/src/array/equal.rs
index a33d6fa..81f62bf 100644
--- a/rust/arrow/src/array/equal.rs
+++ b/rust/arrow/src/array/equal.rs
@@ -216,6 +216,32 @@ impl ArrayEqual for ListArray {
     }
 }
 
+impl<T: ArrowPrimitiveType> ArrayEqual for DictionaryArray<T> {
+    fn equals(&self, other: &dyn Array) -> bool {
+        self.range_equals(other, 0, self.len(), 0)
+    }
+
+    default fn range_equals(
+        &self,
+        other: &dyn Array,
+        start_idx: usize,
+        end_idx: usize,
+        other_start_idx: usize,
+    ) -> bool {
+        assert!(other_start_idx + (end_idx - start_idx) <= other.len());
+        let other = other.as_any().downcast_ref::<DictionaryArray<T>>().unwrap();
+
+        let iter_a = self.keys().take(end_idx).skip(start_idx);
+        let iter_b = other.keys().skip(other_start_idx);
+
+        // For now, all the values must be the same
+        iter_a.eq(iter_b)
+            && self
+                .values()
+                .range_equals(&*other.values(), 0, other.values().len(), 0)
+    }
+}
+
 impl ArrayEqual for FixedSizeListArray {
     fn equals(&self, other: &dyn Array) -> bool {
         if !base_equal(&self.data(), &other.data()) {
@@ -796,6 +822,36 @@ impl PartialEq<ListArray> for Value {
     }
 }
 
+impl<T: ArrowPrimitiveType> JsonEqual for DictionaryArray<T> {
+    fn equals_json(&self, json: &[&Value]) -> bool {
+        self.keys().zip(json.iter()).all(|aj| match aj {
+            (None, Value::Null) => true,
+            (Some(a), Value::Number(j)) => {
+                a.to_usize().unwrap() as u64 == j.as_u64().unwrap()
+            }
+            _ => false,
+        })
+    }
+}
+
+impl<T: ArrowPrimitiveType> PartialEq<Value> for DictionaryArray<T> {
+    fn eq(&self, json: &Value) -> bool {
+        match json {
+            Value::Array(json_array) => self.equals_json_values(json_array),
+            _ => false,
+        }
+    }
+}
+
+impl<T: ArrowPrimitiveType> PartialEq<DictionaryArray<T>> for Value {
+    fn eq(&self, arrow: &DictionaryArray<T>) -> bool {
+        match self {
+            Value::Array(json_array) => arrow.equals_json_values(json_array),
+            _ => false,
+        }
+    }
+}
+
 impl JsonEqual for FixedSizeListArray {
     fn equals_json(&self, json: &[&Value]) -> bool {
         if self.len() != json.len() {
diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs
index b638677..fbe485a 100644
--- a/rust/arrow/src/array/mod.rs
+++ b/rust/arrow/src/array/mod.rs
@@ -70,6 +70,7 @@ pub use self::data::ArrayDataBuilder;
 pub use self::data::ArrayDataRef;
 
 pub use self::array::BinaryArray;
+pub use self::array::DictionaryArray;
 pub use self::array::FixedSizeBinaryArray;
 pub use self::array::FixedSizeListArray;
 pub use self::array::ListArray;
@@ -91,6 +92,15 @@ pub type UInt64Array = PrimitiveArray<UInt64Type>;
 pub type Float32Array = PrimitiveArray<Float32Type>;
 pub type Float64Array = PrimitiveArray<Float64Type>;
 
+pub type Int8DictionaryArray = DictionaryArray<Int8Type>;
+pub type Int16DictionaryArray = DictionaryArray<Int16Type>;
+pub type Int32DictionaryArray = DictionaryArray<Int32Type>;
+pub type Int64DictionaryArray = DictionaryArray<Int64Type>;
+pub type UInt8DictionaryArray = DictionaryArray<UInt8Type>;
+pub type UInt16DictionaryArray = DictionaryArray<UInt16Type>;
+pub type UInt32DictionaryArray = DictionaryArray<UInt32Type>;
+pub type UInt64DictionaryArray = DictionaryArray<UInt64Type>;
+
 pub type TimestampSecondArray = PrimitiveArray<TimestampSecondType>;
 pub type TimestampMillisecondArray = PrimitiveArray<TimestampMillisecondType>;
 pub type TimestampMicrosecondArray = PrimitiveArray<TimestampMicrosecondType>;
@@ -151,6 +161,7 @@ pub use self::builder::FixedSizeBinaryBuilder;
 pub use self::builder::FixedSizeListBuilder;
 pub use self::builder::ListBuilder;
 pub use self::builder::PrimitiveBuilder;
+pub use self::builder::PrimitiveDictionaryBuilder;
 pub use self::builder::StringBuilder;
 pub use self::builder::StructBuilder;
 
diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs
index 3256bb4..ae01873 100644
--- a/rust/arrow/src/datatypes.rs
+++ b/rust/arrow/src/datatypes.rs
@@ -78,6 +78,7 @@ pub enum DataType {
     List(Box<DataType>),
     FixedSizeList(Box<DataType>, i32),
     Struct(Vec<Field>),
+    Dictionary(Box<DataType>, Box<DataType>),
 }
 
 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
@@ -108,12 +109,24 @@ pub struct Field {
     name: String,
     data_type: DataType,
     nullable: bool,
+    dict_id: i64,
+    dict_is_ordered: bool,
 }
 
 pub trait ArrowNativeType:
     fmt::Debug + Send + Sync + Copy + PartialOrd + FromStr + 'static
 {
     fn into_json_value(self) -> Option<Value>;
+
+    /// Convert native type from usize.
+    fn from_usize(_: usize) -> Option<Self> {
+        None
+    }
+
+    /// Convert native type to usize.
+    fn to_usize(&self) -> Option<usize> {
+        None
+    }
 }
 
 /// Trait indicating a primitive fixed-width type (bool, ints and floats).
@@ -143,48 +156,112 @@ impl ArrowNativeType for i8 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for i16 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for i32 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for i64 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for u8 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for u16 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for u32 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for u64 {
     fn into_json_value(self) -> Option<Value> {
         Some(VNumber(Number::from(self)))
     }
+
+    fn from_usize(v: usize) -> Option<Self> {
+        num::FromPrimitive::from_usize(v)
+    }
+
+    fn to_usize(&self) -> Option<usize> {
+        num::ToPrimitive::to_usize(self)
+    }
 }
 
 impl ArrowNativeType for f32 {
@@ -340,6 +417,18 @@ make_type!(
     0i64
 );
 
+/// A subtype of primitive type that represents legal dictionary keys.
+/// See https://arrow.apache.org/docs/format/Columnar.html
+pub trait ArrowDictionaryKeyType: ArrowPrimitiveType {}
+
+impl ArrowDictionaryKeyType for Int8Type {}
+
+impl ArrowDictionaryKeyType for Int16Type {}
+
+impl ArrowDictionaryKeyType for Int32Type {}
+
+impl ArrowDictionaryKeyType for Int64Type {}
+
 /// A subtype of primitive type that represents numeric values.
 ///
 /// SIMD operations are defined in this trait if available on the target system.
@@ -832,6 +921,7 @@ impl DataType {
                 TimeUnit::Microsecond => "MICROSECOND",
                 TimeUnit::Nanosecond => "NANOSECOND",
             }}),
+            DataType::Dictionary(_, _) => json!({ "name": "dictionary"}),
         }
     }
 }
@@ -843,6 +933,25 @@ impl Field {
             name: name.to_string(),
             data_type,
             nullable,
+            dict_id: 0,
+            dict_is_ordered: false,
+        }
+    }
+
+    /// Creates a new field
+    pub fn new_dict(
+        name: &str,
+        data_type: DataType,
+        nullable: bool,
+        dict_id: i64,
+        dict_is_ordered: bool,
+    ) -> Self {
+        Field {
+            name: name.to_string(),
+            data_type,
+            nullable,
+            dict_id,
+            dict_is_ordered,
         }
     }
 
@@ -946,10 +1055,46 @@ impl Field {
                     },
                     _ => data_type,
                 };
+
+                let mut dict_id = 0;
+                let mut dict_is_ordered = false;
+
+                let data_type = match map.get("dictionary") {
+                    Some(dictionary) => {
+                        let index_type = match dictionary.get("indexType") {
+                            Some(t) => DataType::from(t)?,
+                            _ => {
+                                return Err(ArrowError::ParseError(
+                                    "Field missing 'indexType' attribute".to_string(),
+                                ));
+                            }
+                        };
+                        dict_id = match dictionary.get("id") {
+                            Some(Value::Number(n)) => n.as_i64().unwrap(),
+                            _ => {
+                                return Err(ArrowError::ParseError(
+                                    "Field missing 'id' attribute".to_string(),
+                                ));
+                            }
+                        };
+                        dict_is_ordered = match dictionary.get("isOrdered") {
+                            Some(&Value::Bool(n)) => n,
+                            _ => {
+                                return Err(ArrowError::ParseError(
+                                    "Field missing 'isOrdered' attribute".to_string(),
+                                ));
+                            }
+                        };
+                        DataType::Dictionary(Box::new(index_type), Box::new(data_type))
+                    }
+                    _ => data_type,
+                };
                 Ok(Field {
                     name,
                     nullable,
                     data_type,
+                    dict_id,
+                    dict_is_ordered,
                 })
             }
             _ => Err(ArrowError::ParseError(
@@ -972,12 +1117,25 @@ impl Field {
             }
             _ => vec![],
         };
-        json!({
-            "name": self.name,
-            "nullable": self.nullable,
-            "type": self.data_type.to_json(),
-            "children": children
-        })
+        match self.data_type() {
+            DataType::Dictionary(ref index_type, ref value_type) => json!({
+                "name": self.name,
+                "nullable": self.nullable,
+                "type": value_type.to_json(),
+                "children": children,
+                "dictionary": {
+                    "id": self.dict_id,
+                    "indexType": index_type.to_json(),
+                    "isOrdered": self.dict_is_ordered
+                }
+            }),
+            _ => json!({
+                "name": self.name,
+                "nullable": self.nullable,
+                "type": self.data_type.to_json(),
+                "children": children
+            }),
+        }
     }
 
     /// Converts to a `String` representation of the `Field`
@@ -1197,12 +1355,12 @@ mod tests {
 
         assert_eq!(
             "{\"Struct\":[\
-             {\"name\":\"first_name\",\"data_type\":\"Utf8\",\"nullable\":false},\
-             {\"name\":\"last_name\",\"data_type\":\"Utf8\",\"nullable\":false},\
+             {\"name\":\"first_name\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\
+             {\"name\":\"last_name\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\
              {\"name\":\"address\",\"data_type\":{\"Struct\":\
-             [{\"name\":\"street\",\"data_type\":\"Utf8\",\"nullable\":false},\
-             {\"name\":\"zip\",\"data_type\":\"UInt16\",\"nullable\":false}\
-             ]},\"nullable\":false}]}",
+             [{\"name\":\"street\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\
+             {\"name\":\"zip\",\"data_type\":\"UInt16\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false}\
+             ]},\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false}]}",
             serialized
         );
 
@@ -1408,6 +1566,16 @@ mod tests {
                 Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
                 Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
                 Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
+                Field::new_dict(
+                    "c31",
+                    DataType::Dictionary(
+                        Box::new(DataType::Int32),
+                        Box::new(DataType::Utf8),
+                    ),
+                    true,
+                    123,
+                    true,
+                ),
             ],
             metadata,
         );
@@ -1743,6 +1911,23 @@ mod tests {
                             "unit": "NANOSECOND"
                         },
                         "children": []
+                    },
+                    {
+                        "name": "c31",
+                        "nullable": true,
+                        "children": [],
+                        "type": {
+                          "name": "utf8"
+                        },
+                        "dictionary": {
+                          "id": 123,
+                          "indexType": {
+                            "name": "int",
+                            "isSigned": true,
+                            "bitWidth": 32
+                          },
+                          "isOrdered": true
+                        }
                     }
                 ],
                 "metadata" : {
@@ -1808,7 +1993,7 @@ mod tests {
                 false,
             ),
         ]);
-        assert_eq!(_person.to_string(), "first_name: Utf8, last_name: Utf8, address: Struct([Field { name: \"street\", data_type: Utf8, nullable: false }, Field { name: \"zip\", data_type: UInt16, nullable: false }])")
+        assert_eq!(_person.to_string(), "first_name: Utf8, last_name: Utf8, address: Struct([Field { name: \"street\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false }, Field { name: \"zip\", data_type: UInt16, nullable: false, dict_id: 0, dict_is_ordered: false }])")
     }
 
     #[test]
diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs
index dc9f3c2..ff639d1 100644
--- a/rust/arrow/src/error.rs
+++ b/rust/arrow/src/error.rs
@@ -33,6 +33,7 @@ pub enum ArrowError {
     IoError(String),
     InvalidArgumentError(String),
     ParquetError(String),
+    DictionaryKeyOverflowError,
 }
 
 impl From<::std::io::Error> for ArrowError {
@@ -87,6 +88,9 @@ impl Display for ArrowError {
             &ArrowError::ParquetError(ref desc) => {
                 write!(f, "Parquet argument error: {}", desc)
             }
+            &ArrowError::DictionaryKeyOverflowError => {
+                write!(f, "Dictionary key bigger than the key type")
+            }
         }
     }
 }
diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs
index 6d09ca4..1fecfff 100644
--- a/rust/arrow/src/flight/mod.rs
+++ b/rust/arrow/src/flight/mod.rs
@@ -72,12 +72,18 @@ pub fn flight_data_to_batch(
 ) -> Result<Option<RecordBatch>> {
     // check that the data_header is a record batch message
     let message = crate::ipc::get_root_as_message(&data.data_header[..]);
+    let dictionaries_by_field = Vec::new();
     let batch_header = message
         .header_as_record_batch()
         .ok_or(ArrowError::ParseError(
             "Unable to convert flight data header to a record batch".to_string(),
         ))?;
-    reader::read_record_batch(&data.data_body, batch_header, schema)
+    reader::read_record_batch(
+        &data.data_body,
+        batch_header,
+        schema,
+        &dictionaries_by_field,
+    )
 }
 
 // TODO: add more explicit conversion that expoess flight descriptor and metadata options
diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs
index f53914e..71b4823 100644
--- a/rust/arrow/src/ipc/convert.rs
+++ b/rust/arrow/src/ipc/convert.rs
@@ -119,11 +119,21 @@ pub(crate) fn schema_to_fb_offset<'a: 'b, 'b>(
 /// Convert an IPC Field to Arrow Field
 impl<'a> From<ipc::Field<'a>> for Field {
     fn from(field: ipc::Field) -> Field {
-        Field::new(
-            field.name().unwrap(),
-            get_data_type(field),
-            field.nullable(),
-        )
+        if let Some(dictionary) = field.dictionary() {
+            Field::new_dict(
+                field.name().unwrap(),
+                get_data_type(field, true),
+                field.nullable(),
+                dictionary.id(),
+                dictionary.isOrdered(),
+            )
+        } else {
+            Field::new(
+                field.name().unwrap(),
+                get_data_type(field, true),
+                field.nullable(),
+            )
+        }
     }
 }
 
@@ -159,7 +169,28 @@ pub(crate) fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
 }
 
 /// Get the Arrow data type from the flatbuffer Field table
-pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
+pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataType {
+    if let Some(dictionary) = field.dictionary() {
+        if may_be_dictionary {
+            let int = dictionary.indexType().unwrap();
+            let index_type = match (int.bitWidth(), int.is_signed()) {
+                (8, true) => DataType::Int8,
+                (8, false) => DataType::UInt8,
+                (16, true) => DataType::Int16,
+                (16, false) => DataType::UInt16,
+                (32, true) => DataType::Int32,
+                (32, false) => DataType::UInt32,
+                (64, true) => DataType::Int64,
+                (64, false) => DataType::UInt64,
+                _ => panic!("Unexpected bitwidth and signed"),
+            };
+            return DataType::Dictionary(
+                Box::new(index_type),
+                Box::new(get_data_type(field, false)),
+            );
+        }
+    }
+
     match field.type_type() {
         ipc::Type::Bool => DataType::Boolean,
         ipc::Type::Int => {
@@ -256,7 +287,7 @@ pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
             }
             let child_field = children.get(0);
             // returning int16 for now, to test, not sure how to get data type
-            DataType::List(Box::new(get_data_type(child_field)))
+            DataType::List(Box::new(get_data_type(child_field, false)))
         }
         ipc::Type::FixedSizeList => {
             let children = field.children().unwrap();
@@ -265,7 +296,10 @@ pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
             }
             let child_field = children.get(0);
             let fsl = field.type_as_fixed_size_list().unwrap();
-            DataType::FixedSizeList(Box::new(get_data_type(child_field)), fsl.listSize())
+            DataType::FixedSizeList(
+                Box::new(get_data_type(child_field, false)),
+                fsl.listSize(),
+            )
         }
         ipc::Type::Struct_ => {
             let mut fields = vec![];
@@ -545,6 +579,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
                 Some(children),
             )
         }
+        t @ _ => unimplemented!("Type {:?} not supported", t),
     }
 }
 
diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs
index 25ba0cd..632c354 100644
--- a/rust/arrow/src/ipc/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -20,13 +20,14 @@
 //! The `FileReader` and `StreamReader` have similar interfaces,
 //! however the `FileReader` expects a reader that supports `Seek`ing
 
+use std::collections::HashMap;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use crate::array::*;
 use crate::buffer::Buffer;
 use crate::compute::cast;
-use crate::datatypes::{DataType, IntervalUnit, Schema, SchemaRef};
+use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
 use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchReader};
@@ -54,6 +55,7 @@ fn create_array(
     data_type: &DataType,
     data: &Vec<u8>,
     buffers: &[ipc::Buffer],
+    dictionaries: &Vec<Option<ArrayRef>>,
     mut node_index: usize,
     mut buffer_index: usize,
 ) -> (ArrayRef, usize, usize) {
@@ -98,6 +100,7 @@ fn create_array(
                 list_data_type,
                 data,
                 buffers,
+                dictionaries,
                 node_index,
                 buffer_index,
             );
@@ -119,6 +122,7 @@ fn create_array(
                 list_data_type,
                 data,
                 buffers,
+                dictionaries,
                 node_index,
                 buffer_index,
             );
@@ -143,6 +147,7 @@ fn create_array(
                     struct_field.data_type(),
                     data,
                     buffers,
+                    dictionaries,
                     node_index,
                     buffer_index,
                 );
@@ -163,6 +168,24 @@ fn create_array(
             };
             Arc::new(struct_array)
         }
+        // Create dictionary array from RecordBatch
+        Dictionary(_, _) => {
+            let index_node = &nodes[node_index];
+            let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
+                .iter()
+                .map(|buf| read_buffer(buf, data))
+                .collect();
+            let value_array = dictionaries[node_index].clone().unwrap();
+            node_index = node_index + 1;
+            buffer_index = buffer_index + 2;
+
+            create_dictionary_array(
+                index_node,
+                data_type,
+                &index_buffers[..],
+                value_array,
+            )
+        }
         _ => {
             let array = create_primitive_array(
                 &nodes[node_index],
@@ -347,11 +370,38 @@ fn create_list_array(
     }
 }
 
+/// Reads the correct number of buffers based on list type and null_count, and creates a
+/// list array ref
+fn create_dictionary_array(
+    field_node: &ipc::FieldNode,
+    data_type: &DataType,
+    buffers: &[Buffer],
+    value_array: ArrayRef,
+) -> ArrayRef {
+    if let &DataType::Dictionary(_, _) = data_type {
+        let null_count = field_node.null_count() as usize;
+        let mut builder = ArrayData::builder(data_type.clone())
+            .len(field_node.length() as usize)
+            .buffers(buffers[1..2].to_vec())
+            .offset(0)
+            .child_data(vec![value_array.data()]);
+        if null_count > 0 {
+            builder = builder
+                .null_count(null_count)
+                .null_bit_buffer(buffers[0].clone())
+        }
+        make_array(builder.build())
+    } else {
+        unreachable!("Cannot create dictionary array from {:?}", data_type)
+    }
+}
+
 /// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
 pub(crate) fn read_record_batch(
     buf: &Vec<u8>,
     batch: ipc::RecordBatch,
     schema: Arc<Schema>,
+    dictionaries: &Vec<Option<ArrayRef>>,
 ) -> Result<Option<RecordBatch>> {
     let buffers = batch.buffers().ok_or(ArrowError::IoError(
         "Unable to get buffers from IPC RecordBatch".to_string(),
@@ -371,6 +421,7 @@ pub(crate) fn read_record_batch(
             field.data_type(),
             &buf,
             buffers,
+            dictionaries,
             node_index,
             buffer_index,
         );
@@ -382,20 +433,43 @@ pub(crate) fn read_record_batch(
     RecordBatch::try_new(schema.clone(), arrays).map(|batch| Some(batch))
 }
 
+// Linear search for the first dictionary field with a dictionary id.
+fn find_dictionary_field(ipc_schema: &ipc::Schema, id: i64) -> Option<usize> {
+    let fields = ipc_schema.fields().unwrap();
+    for i in 0..fields.len() {
+        let field: ipc::Field = fields.get(i);
+        if let Some(dictionary) = field.dictionary() {
+            if dictionary.id() == id {
+                return Some(i);
+            }
+        }
+    }
+    None
+}
+
 /// Arrow File reader
 pub struct FileReader<R: Read + Seek> {
     /// Buffered file reader that supports reading and seeking
     reader: BufReader<R>,
+
     /// The schema that is read from the file header
     schema: Arc<Schema>,
+
     /// The blocks in the file
     ///
     /// A block indicates the regions in the file to read to get data
     blocks: Vec<ipc::Block>,
+
     /// A counter to keep track of the current block that should be read
     current_block: usize,
+
     /// The total number of blocks, which may contain record batches and other types
     total_blocks: usize,
+
+    /// Optional dictionaries for each schema field.
+    ///
+    /// Dictionaries may be appended to in the streaming format.
+    dictionaries_by_field: Vec<Option<ArrayRef>>,
 }
 
 impl<R: Read + Seek> FileReader<R> {
@@ -420,24 +494,6 @@ impl<R: Read + Seek> FileReader<R> {
                 "Arrow file does not contain correct footer".to_string(),
             ));
         }
-        reader.seek(SeekFrom::Start(8))?;
-        // determine metadata length
-        let mut meta_size: [u8; 4] = [0; 4];
-        reader.read_exact(&mut meta_size)?;
-        let meta_len = u32::from_le_bytes(meta_size);
-
-        let mut meta_buffer = vec![0; meta_len as usize];
-        reader.seek(SeekFrom::Start(12))?;
-        reader.read_exact(&mut meta_buffer)?;
-
-        let vecs = &meta_buffer.to_vec();
-        let message = ipc::get_root_as_message(vecs);
-        // message header is a Schema, so read it
-        let ipc_schema: ipc::Schema =
-            message.header_as_schema().ok_or(ArrowError::IoError(
-                "Unable to Unable to read IPC message as schema".to_string(),
-            ))?;
-        let schema = ipc::convert::fb_to_schema(ipc_schema);
 
         // what does the footer contain?
         let mut footer_size: [u8; 4] = [0; 4];
@@ -457,12 +513,97 @@ impl<R: Read + Seek> FileReader<R> {
 
         let total_blocks = blocks.len();
 
+        let ipc_schema = footer.schema().unwrap();
+        let schema = ipc::convert::fb_to_schema(ipc_schema);
+
+        // Create an array of optional dictionary value arrays, one per field.
+        let mut dictionaries_by_field = vec![None; schema.fields().len()];
+        for block in footer.dictionaries().unwrap() {
+            // read length from end of offset
+            let meta_len = block.metaDataLength() - 4;
+
+            let mut block_data = vec![0; meta_len as usize];
+            reader.seek(SeekFrom::Start(block.offset() as u64 + 4))?;
+            reader.read_exact(&mut block_data)?;
+
+            let message = ipc::get_root_as_message(&block_data[..]);
+
+            match message.header_type() {
+                ipc::MessageHeader::DictionaryBatch => {
+                    let batch = message.header_as_dictionary_batch().unwrap();
+
+                    // read the block that makes up the dictionary batch into a buffer
+                    let mut buf = vec![0; block.bodyLength() as usize];
+                    reader.seek(SeekFrom::Start(
+                        block.offset() as u64 + block.metaDataLength() as u64,
+                    ))?;
+                    reader.read_exact(&mut buf)?;
+
+                    if batch.isDelta() {
+                        panic!("delta dictionary batches not supported");
+                    }
+
+                    let id = batch.id();
+
+                    // As the dictionary batch does not contain the type of the
+                    // values array, we need to retieve this from the schema.
+                    let first_field = find_dictionary_field(&ipc_schema, id)
+                        .expect("dictionary id not found in shchema");
+
+                    // Get an array representing this dictionary's values.
+                    let dictionary_values: ArrayRef =
+                        match schema.field(first_field).data_type() {
+                            DataType::Dictionary(_, ref value_type) => {
+                                // Make a fake schema for the dictionary batch.
+                                let schema = Schema {
+                                    fields: vec![Field::new(
+                                        "",
+                                        value_type.as_ref().clone(),
+                                        false,
+                                    )],
+                                    metadata: HashMap::new(),
+                                };
+                                // Read a single column
+                                let record_batch = read_record_batch(
+                                    &buf,
+                                    batch.data().unwrap(),
+                                    Arc::new(schema),
+                                    &dictionaries_by_field,
+                                )?
+                                .unwrap();
+                                Some(record_batch.column(0).clone())
+                            }
+                            _ => None,
+                        }
+                        .expect("dictionary id not found in schema");
+
+                    // for all fields with this dictionary id, update the dictionaries vector
+                    // in the reader. Note that a dictionary batch may be shared between many fields.
+                    // We don't currently record the isOrdered field. This could be general
+                    // attributes of arrays.
+                    let fields = ipc_schema.fields().unwrap();
+                    for i in 0..fields.len() {
+                        let field: ipc::Field = fields.get(i);
+                        if let Some(dictionary) = field.dictionary() {
+                            if dictionary.id() == id {
+                                // Add (possibly multiple) array refs to the dictionaries array.
+                                dictionaries_by_field[i] =
+                                    Some(dictionary_values.clone());
+                            }
+                        }
+                    }
+                }
+                _ => panic!("Expecting DictionaryBatch in dictionary blocks."),
+            };
+        }
+
         Ok(Self {
             reader,
             schema: Arc::new(schema),
             blocks: blocks.to_vec(),
             current_block: 0,
             total_blocks,
+            dictionaries_by_field,
         })
     }
 
@@ -511,7 +652,12 @@ impl<R: Read + Seek> FileReader<R> {
                     ))?;
                     self.reader.read_exact(&mut buf)?;
 
-                    read_record_batch(&buf, batch, self.schema())
+                    read_record_batch(
+                        &buf,
+                        batch,
+                        self.schema(),
+                        &self.dictionaries_by_field,
+                    )
                 }
                 _ => {
                     return Err(ArrowError::IoError(
@@ -561,6 +707,11 @@ pub struct StreamReader<R: Read> {
     ///
     /// This value is set to `true` the first time the reader's `next()` returns `None`.
     finished: bool,
+
+    /// Optional dictionaries for each schema field.
+    ///
+    /// Dictionaries may be appended to in the streaming format.
+    dictionaries_by_field: Vec<Option<ArrayRef>>,
 }
 
 impl<R: Read> StreamReader<R> {
@@ -587,10 +738,14 @@ impl<R: Read> StreamReader<R> {
         )?;
         let schema = ipc::convert::fb_to_schema(ipc_schema);
 
+        // Create an array of optional dictionary value arrays, one per field.
+        let dictionaries_by_field = vec![None; schema.fields().len()];
+
         Ok(Self {
             reader,
             schema: Arc::new(schema),
             finished: false,
+            dictionaries_by_field,
         })
     }
 
@@ -636,7 +791,7 @@ impl<R: Read> StreamReader<R> {
                 let mut buf = vec![0; message.bodyLength() as usize];
                 self.reader.read_exact(&mut buf)?;
 
-                read_record_batch(&buf, batch, self.schema())
+                read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field)
             }
             _ => {
                 return Err(ArrowError::IoError(
@@ -680,6 +835,7 @@ mod tests {
         let paths = vec![
             "generated_interval",
             "generated_datetime",
+            "generated_dictionary",
             "generated_nested",
             "generated_primitive_no_batches",
             "generated_primitive_zerolength",
@@ -707,6 +863,7 @@ mod tests {
         let paths = vec![
             "generated_interval",
             "generated_datetime",
+            // "generated_dictionary",
             "generated_nested",
             "generated_primitive_no_batches",
             "generated_primitive_zerolength",
diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs
index 7f26511..0da21df 100644
--- a/rust/arrow/src/util/integration_util.rs
+++ b/rust/arrow/src/util/integration_util.rs
@@ -31,6 +31,7 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
 pub(crate) struct ArrowJson {
     schema: ArrowJsonSchema,
     batches: Vec<ArrowJsonBatch>,
+    dictionaries: Option<Vec<ArrowJsonDictionaryBatch>>,
 }
 
 /// A struct that partially reads the Arrow JSON schema.
@@ -48,6 +49,14 @@ struct ArrowJsonBatch {
     columns: Vec<ArrowJsonColumn>,
 }
 
+/// A struct that partially reads the Arrow JSON dictionary batch
+#[derive(Deserialize)]
+#[allow(non_snake_case)]
+struct ArrowJsonDictionaryBatch {
+    id: i64,
+    data: ArrowJsonBatch,
+}
+
 /// A struct that partially reads the Arrow JSON column/array
 #[derive(Deserialize, Clone, Debug)]
 struct ArrowJsonColumn {
@@ -227,6 +236,81 @@ impl ArrowJsonBatch {
                         let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
                         arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
                     }
+                    DataType::Dictionary(ref key_type, _) => match key_type.as_ref() {
+                        DataType::Int8 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<Int8DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::Int16 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<Int16DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::Int32 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<Int32DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::Int64 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<Int64DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::UInt8 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<UInt8DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::UInt16 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<UInt16DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::UInt32 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<UInt32DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        DataType::UInt64 => {
+                            let arr = arr
+                                .as_any()
+                                .downcast_ref::<UInt64DictionaryArray>()
+                                .unwrap();
+                            arr.equals_json(
+                                &json_array.iter().collect::<Vec<&Value>>()[..],
+                            )
+                        }
+                        t @ _ => panic!("Unsupported dictionary comparison for {:?}", t),
+                    },
                     t @ _ => panic!("Unsupported comparison for {:?}", t),
                 }
             })