You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/17 16:50:41 UTC

[arrow] branch master updated: ARROW-2583: [Rust] Buffer should be typeless

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1209a80  ARROW-2583: [Rust] Buffer should be typeless
1209a80 is described below

commit 1209a80fc6384428f43eac20804a01e0196638d3
Author: Chao Sun <su...@uber.com>
AuthorDate: Fri Aug 17 12:50:32 2018 -0400

    ARROW-2583: [Rust] Buffer should be typeless
    
    This changes the existing `Buffer` class to be non-generic over type `T`, since a `Buffer` class should just represent a plain byte array and interpretation of the data within the buffer should be done on a higher level, such as in `Array`.
    
    While working on this, I found that I also need to make significant changes on the `Array` and `List` types, since they are currently heavily tied with the `Buffer<T>` implementation. The new implementation follows arrow-cpp and defines a `ArrayData` struct which provides the common operations on a Arrow array. Subtypes of `Array` then provide specific operations for the types they represent. For instance, one can get a primitive value at index `i` for `PrimitiveArray` type, or can ge [...]
    
    I removed `List` since it's no longer necessary. Removed `PrimitiveArray::{min,max}` for now but plan to add them back.
    
    Author: Chao Sun <su...@uber.com>
    
    Closes #2330 from sunchao/ARROW-2583 and squashes the following commits:
    
    91c580b8 <Chao Sun> Fix lint
    0e8a8dc9 <Chao Sun> Address review comments
    21b8d1df <Chao Sun> Fix lint
    2493d122 <Chao Sun> Fix a few more issues and add more tests
    383cc3ef <Chao Sun> More refactoring
    2ee3cf95 <Chao Sun> Fix lint
    a29ae4a2 <Chao Sun> Fix test for is_aligned
    c1941651 <Chao Sun> Fix Buffer offset and test for Array alignment
    a3206cc5 <Chao Sun> Address review comments
    18634481 <Chao Sun> Fix lint
    1e8dab51 <Chao Sun> In is_aligned(), should use align_of instead of size_of
    363e7cfc <Chao Sun> Fix bench. Change Buffer#copy() to Buffer#clone()
    042796b4 <Chao Sun> Add check for pointer alignment
    18e5dead <Chao Sun> Address comments
    51327fed <Chao Sun> Address comments
    ac782f14 <Chao Sun> Remove commented out code
    08fb8479 <Chao Sun> Fix to_bytes() collision and test failure
    c3c0f6c5 <Chao Sun> Fix style
    83e1a1fd <Chao Sun> Bring back min and max for PrimitiveArray
    7e57fd0d <Chao Sun>  ARROW-2583:  Buffer should be typeless
---
 rust/Cargo.toml                                    |    5 +-
 rust/benches/array_from_builder.rs                 |   49 -
 rust/benches/array_from_vec.rs                     |   12 +-
 rust/examples/dynamic_types.rs                     |   23 +-
 rust/src/array.rs                                  | 1001 ++++++++++++++------
 rust/src/array_data.rs                             |  295 ++++++
 rust/src/bitmap.rs                                 |   42 +-
 rust/src/buffer.rs                                 |  358 +++----
 rust/src/builder.rs                                |   15 +-
 rust/src/datatypes.rs                              |   32 +-
 rust/src/lib.rs                                    |    6 +-
 rust/src/list.rs                                   |  110 ---
 rust/src/list_builder.rs                           |  101 --
 rust/src/memory.rs                                 |   40 +-
 rust/src/memory_pool.rs                            |    4 +-
 rust/src/record_batch.rs                           |   35 +-
 rust/src/util/bit_util.rs                          |  181 ++++
 .../array_from_vec.rs => src/util/mod.rs}          |   11 +-
 .../util/test_util.rs}                             |   30 +-
 19 files changed, 1478 insertions(+), 872 deletions(-)

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 7cedb15..b618ea9 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -37,6 +37,7 @@ path = "src/lib.rs"
 bytes = "0.4"
 libc = "0.2"
 serde_json = "1.0.13"
+rand = "0.5"
 
 [dev-dependencies]
 criterion = "0.2"
@@ -44,7 +45,3 @@ criterion = "0.2"
 [[bench]]
 name = "array_from_vec"
 harness = false
-
-[[bench]]
-name = "array_from_builder"
-harness = false
\ No newline at end of file
diff --git a/rust/benches/array_from_builder.rs b/rust/benches/array_from_builder.rs
deleted file mode 100644
index 2398e2e..0000000
--- a/rust/benches/array_from_builder.rs
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[macro_use]
-extern crate criterion;
-
-use criterion::Criterion;
-
-extern crate arrow;
-
-use arrow::array::*;
-use arrow::builder::*;
-
-fn array_from_builder(n: usize) {
-    let mut v: Builder<i32> = Builder::with_capacity(n);
-    for i in 0..n {
-        v.push(i as i32);
-    }
-    PrimitiveArray::from(v.finish());
-}
-
-fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("array_from_builder 128", |b| {
-        b.iter(|| array_from_builder(128))
-    });
-    c.bench_function("array_from_builder 256", |b| {
-        b.iter(|| array_from_builder(256))
-    });
-    c.bench_function("array_from_builder 512", |b| {
-        b.iter(|| array_from_builder(512))
-    });
-}
-
-criterion_group!(benches, criterion_benchmark);
-criterion_main!(benches);
diff --git a/rust/benches/array_from_vec.rs b/rust/benches/array_from_vec.rs
index 50f0c52..509628d 100644
--- a/rust/benches/array_from_vec.rs
+++ b/rust/benches/array_from_vec.rs
@@ -23,13 +23,19 @@ use criterion::Criterion;
 extern crate arrow;
 
 use arrow::array::*;
+use arrow::array_data::ArrayDataBuilder;
+use arrow::buffer::Buffer;
+use arrow::datatypes::*;
 
 fn array_from_vec(n: usize) {
-    let mut v: Vec<i32> = Vec::with_capacity(n);
+    let mut v: Vec<u8> = Vec::with_capacity(n);
     for i in 0..n {
-        v.push(i as i32);
+        v.push((i & 0xffff) as u8);
     }
-    PrimitiveArray::from(v);
+    let arr_data = ArrayDataBuilder::new(DataType::Int32)
+        .add_buffer(Buffer::from(v))
+        .build();
+    criterion::black_box(PrimitiveArray::<i32>::from(arr_data));
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
diff --git a/rust/examples/dynamic_types.rs b/rust/examples/dynamic_types.rs
index b8093bf..678564e 100644
--- a/rust/examples/dynamic_types.rs
+++ b/rust/examples/dynamic_types.rs
@@ -43,9 +43,18 @@ fn main() {
     let id = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
 
     let nested = StructArray::from(vec![
-        Arc::new(ListArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<Array>,
-        Arc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
-        Arc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
+        (
+            Field::new("a", DataType::Utf8, false),
+            Arc::new(BinaryArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<Array>,
+        ),
+        (
+            Field::new("b", DataType::Float64, false),
+            Arc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])),
+        ),
+        (
+            Field::new("c", DataType::Float64, false),
+            Arc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])),
+        ),
     ]);
 
     // build a record batch
@@ -54,7 +63,7 @@ fn main() {
     process(&batch);
 }
 
-/// Create a new batch by performing a projection of id, (nested.b + nested.c) AS sum
+/// Create a new batch by performing a projection of id, nested.c
 fn process(batch: &RecordBatch) {
     let id = batch.column(0);
     let nested = batch
@@ -63,12 +72,12 @@ fn process(batch: &RecordBatch) {
         .downcast_ref::<StructArray>()
         .unwrap();
 
-    let nested_b = nested
+    let _nested_b = nested
         .column(1)
         .as_any()
         .downcast_ref::<PrimitiveArray<f64>>()
         .unwrap();
-    let nested_c = nested
+    let nested_c: &PrimitiveArray<f64> = nested
         .column(2)
         .as_any()
         .downcast_ref::<PrimitiveArray<f64>>()
@@ -83,7 +92,7 @@ fn process(batch: &RecordBatch) {
         Arc::new(projected_schema),
         vec![
             id.clone(), //NOTE: this is cloning the Arc not the array data
-            Arc::new(nested_b.add(nested_c)),
+            Arc::new(PrimitiveArray::<f64>::from(nested_c.data())),
         ],
     );
 }
diff --git a/rust/src/array.rs b/rust/src/array.rs
index 1c4322c..c5d7ee1 100644
--- a/rust/src/array.rs
+++ b/rust/src/array.rs
@@ -18,444 +18,839 @@
 ///! Array types
 use std::any::Any;
 use std::convert::From;
-use std::ops::Add;
-use std::str;
-use std::string::String;
+use std::mem;
+use std::ptr;
 use std::sync::Arc;
 
-use super::bitmap::Bitmap;
-use super::buffer::*;
-use super::builder::*;
-use super::datatypes::*;
-use super::list::*;
-use super::list_builder::*;
+use array_data::*;
+use buffer::*;
+use datatypes::*;
+use memory;
+use util::bit_util;
 
-/// Trait for dealing with different types of Array at runtime when the type of the
+/// Trait for dealing with different types of array at runtime when the type of the
 /// array is not known in advance
 pub trait Array: Send + Sync {
-    /// Returns the length of the array (number of items in the array)
-    fn len(&self) -> usize;
-    /// Returns the number of null values in the array
-    fn null_count(&self) -> usize;
-    /// Optional validity bitmap (can be None if there are no null values)
-    fn validity_bitmap(&self) -> &Option<Bitmap>;
-    /// Return the array as Any so that it can be downcast to a specific implementation
+    /// Returns the array as `Any` so that it can be downcast to a specific implementation
     fn as_any(&self) -> &Any;
-}
 
-/// Array of List<T>
-pub struct ListArray<T: ArrowPrimitiveType> {
-    len: usize,
-    data: List<T>,
-    null_count: usize,
-    validity_bitmap: Option<Bitmap>,
-}
+    /// Returns a reference-counted pointer to the data of this array
+    fn data(&self) -> ArrayDataRef;
+
+    /// Returns a borrowed & reference-counted pointer to the data of this array
+    fn data_ref(&self) -> &ArrayDataRef;
 
-impl<T> ListArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    pub fn len(&self) -> usize {
-        self.len
+    /// Returns a reference to the data type of this array
+    fn data_type(&self) -> &DataType {
+        self.data_ref().data_type()
     }
 
-    pub fn null_count(&self) -> usize {
-        self.null_count
+    /// Returns the length (i.e., number of elements) of this array
+    fn len(&self) -> i64 {
+        self.data().len()
     }
 
-    pub fn validity_bitmap(&self) -> &Option<Bitmap> {
-        &self.validity_bitmap
+    /// Returns the offset of this array
+    fn offset(&self) -> i64 {
+        self.data().offset()
     }
 
-    pub fn get(&self, i: usize) -> &[T] {
-        self.data.get(i)
+    /// Returns whether the element at index `i` is null
+    fn is_null(&self, i: i64) -> bool {
+        self.data().is_null(i)
     }
 
-    pub fn list(&self) -> &List<T> {
-        &self.data
+    /// Returns whether the element at index `i` is not null
+    fn is_valid(&self, i: i64) -> bool {
+        self.data().is_valid(i)
     }
-}
 
-/// Create a ListArray<T> from a List<T> without null values
-impl<T> From<List<T>> for ListArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn from(list: List<T>) -> Self {
-        let len = list.len();
-        ListArray {
-            len,
-            data: list,
-            validity_bitmap: None,
-            null_count: 0,
-        }
+    /// Returns the total number of nulls in this array
+    fn null_count(&self) -> i64 {
+        self.data().null_count()
     }
 }
 
-/// Create ListArray<u8> from Vec<&'static str>
-impl From<Vec<&'static str>> for ListArray<u8> {
-    fn from(v: Vec<&'static str>) -> Self {
-        let mut builder: ListBuilder<u8> = ListBuilder::with_capacity(v.len());
-        for s in v {
-            builder.push(s.as_bytes())
-        }
-        ListArray::from(builder.finish())
+pub type ArrayRef = Arc<Array>;
+
+/// Constructs an array using the input `data`. Returns a reference-counted `Array`
+/// instance.
+fn make_array(data: ArrayDataRef) -> ArrayRef {
+    // TODO: here data_type() needs to clone the type - maybe add a type tag enum to
+    // avoid the cloning.
+    match data.data_type().clone() {
+        DataType::Boolean => Arc::new(PrimitiveArray::<bool>::from(data)) as ArrayRef,
+        DataType::Int8 => Arc::new(PrimitiveArray::<i8>::from(data)) as ArrayRef,
+        DataType::Int16 => Arc::new(PrimitiveArray::<i16>::from(data)) as ArrayRef,
+        DataType::Int32 => Arc::new(PrimitiveArray::<i32>::from(data)) as ArrayRef,
+        DataType::Int64 => Arc::new(PrimitiveArray::<i64>::from(data)) as ArrayRef,
+        DataType::UInt8 => Arc::new(PrimitiveArray::<u8>::from(data)) as ArrayRef,
+        DataType::UInt16 => Arc::new(PrimitiveArray::<u16>::from(data)) as ArrayRef,
+        DataType::UInt32 => Arc::new(PrimitiveArray::<u32>::from(data)) as ArrayRef,
+        DataType::UInt64 => Arc::new(PrimitiveArray::<u64>::from(data)) as ArrayRef,
+        DataType::Float32 => Arc::new(PrimitiveArray::<f32>::from(data)) as ArrayRef,
+        DataType::Float64 => Arc::new(PrimitiveArray::<f64>::from(data)) as ArrayRef,
+        DataType::Utf8 => Arc::new(BinaryArray::from(data)) as ArrayRef,
+        DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef,
+        DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef,
+        dt => panic!("Unexpected data type {:?}", dt),
     }
 }
 
-/// Create ListArray<u8> from Vec<String>
-impl From<Vec<String>> for ListArray<u8> {
-    fn from(v: Vec<String>) -> Self {
-        let mut builder: ListBuilder<u8> = ListBuilder::with_capacity(v.len());
-        for s in v {
-            builder.push(s.as_bytes())
-        }
-        ListArray::from(builder.finish())
-    }
+/// ----------------------------------------------------------------------------
+/// Implementations of different array types
+
+struct RawPtrBox<T> {
+    inner: *const T,
 }
 
-impl<T> Array for ListArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn len(&self) -> usize {
-        self.len
+impl<T> RawPtrBox<T> {
+    fn new(inner: *const T) -> Self {
+        Self { inner: inner }
     }
-    fn null_count(&self) -> usize {
-        self.null_count
-    }
-    fn validity_bitmap(&self) -> &Option<Bitmap> {
-        &self.validity_bitmap
-    }
-    fn as_any(&self) -> &Any {
-        self
+
+    fn get(&self) -> *const T {
+        self.inner
     }
 }
 
-/// Array of T
+unsafe impl<T> Send for RawPtrBox<T> {}
+unsafe impl<T> Sync for RawPtrBox<T> {}
+
+/// Array whose elements are of primitive types.
 pub struct PrimitiveArray<T: ArrowPrimitiveType> {
-    len: usize,
-    data: Buffer<T>,
-    null_count: usize,
-    validity_bitmap: Option<Bitmap>,
+    data: ArrayDataRef,
+    /// Pointer to the value array. The lifetime of this must be <= to the value buffer
+    /// stored in `data`, so it's safe to store.
+    raw_values: RawPtrBox<T>,
 }
 
-impl<T> PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    pub fn new(data: Buffer<T>, null_count: usize, validity_bitmap: Option<Bitmap>) -> Self {
-        PrimitiveArray {
-            len: data.len(),
-            data: data,
-            null_count,
-            validity_bitmap,
-        }
-    }
+/// Macro to define primitive arrays for different data types and native types.
+macro_rules! def_primitive_array {
+    ($data_ty:path, $native_ty:ident) => {
+        impl PrimitiveArray<$native_ty> {
+            pub fn new(length: i64, values: Buffer, null_count: i64, offset: i64) -> Self {
+                let array_data = ArrayData::builder($data_ty)
+                    .len(length)
+                    .add_buffer(values)
+                    .null_count(null_count)
+                    .offset(offset)
+                    .build();
+                PrimitiveArray::from(array_data)
+            }
 
-    pub fn len(&self) -> usize {
-        self.len
-    }
+            /// Returns a `Buffer` holds all the values of this array.
+            ///
+            /// Note this doesn't take account into the offset of this array.
+            pub fn values(&self) -> Buffer {
+                self.data.buffers()[0].clone()
+            }
 
-    pub fn get(&self, i: usize) -> &T {
-        self.data.get(i)
-    }
+            /// Returns a raw pointer to the values of this array.
+            pub fn raw_values(&self) -> *const $native_ty {
+                unsafe { mem::transmute(self.raw_values.get().offset(self.data.offset() as isize)) }
+            }
 
-    pub fn iter(&self) -> BufferIterator<T> {
-        self.data.iter()
-    }
+            /// Returns the primitive value at index `i`.
+            ///
+            /// Note this doesn't do any bound checking, for performance reason.
+            pub fn value(&self, i: i64) -> $native_ty {
+                unsafe { *(self.raw_values().offset(i as isize)) }
+            }
 
-    pub fn buffer(&self) -> &Buffer<T> {
-        &self.data
-    }
+            /// Returns the minimum value in the array, according to the natural order.
+            pub fn min(&self) -> Option<$native_ty> {
+                self.min_max_helper(|a, b| a < b)
+            }
 
-    /// Determine the minimum value in the array
-    pub fn min(&self) -> Option<T> {
-        let mut n: Option<T> = None;
-        match &self.validity_bitmap {
-            &Some(ref bitmap) => for i in 0..self.len {
-                if bitmap.is_set(i) {
-                    let mut m = self.data.get(i);
+            /// Returns the maximum value in the array, according to the natural order.
+            pub fn max(&self) -> Option<$native_ty> {
+                self.min_max_helper(|a, b| a > b)
+            }
+
+            fn min_max_helper<F>(&self, cmp: F) -> Option<$native_ty>
+            where
+                F: Fn($native_ty, $native_ty) -> bool,
+            {
+                let mut n: Option<$native_ty> = None;
+                let data = self.data();
+                for i in 0..data.len() {
+                    if data.is_null(i) {
+                        continue;
+                    }
+                    let m = self.value(i as i64);
                     match n {
-                        None => n = Some(*m),
-                        Some(nn) => if *m < nn {
-                            n = Some(*m)
+                        None => n = Some(m),
+                        Some(nn) => if cmp(m, nn) {
+                            n = Some(m)
                         },
                     }
                 }
-            },
-            &None => for i in 0..self.len {
-                let mut m = self.data.get(i);
-                match n {
-                    None => n = Some(*m),
-                    Some(nn) => if *m < nn {
-                        n = Some(*m)
-                    },
-                }
-            },
+                n
+            }
         }
-        n
-    }
 
-    /// Determine the maximum value in the array
-    pub fn max(&self) -> Option<T> {
-        let mut n: Option<T> = None;
-        match &self.validity_bitmap {
-            &Some(ref bitmap) => for i in 0..self.len {
-                if bitmap.is_set(i) {
-                    let mut m = self.data.get(i);
-                    match n {
-                        None => n = Some(*m),
-                        Some(nn) => if *m > nn {
-                            n = Some(*m)
-                        },
-                    }
+        /// Constructs a primitive array from a vector. Should only be used for testing.
+        impl From<Vec<$native_ty>> for PrimitiveArray<$native_ty> {
+            fn from(data: Vec<$native_ty>) -> Self {
+                let array_data = ArrayData::builder($data_ty)
+                    .len(data.len() as i64)
+                    .add_buffer(Buffer::from(data.to_byte_slice()))
+                    .build();
+                PrimitiveArray::from(array_data)
+            }
+        }
+
+        impl From<Vec<Option<$native_ty>>> for PrimitiveArray<$native_ty> {
+            fn from(data: Vec<Option<$native_ty>>) -> Self {
+                const TY_SIZE: usize = mem::size_of::<$native_ty>();
+                const NULL: [u8; TY_SIZE] = [0; TY_SIZE];
+
+                let data_len = data.len() as i64;
+                let size = bit_util::round_upto_multiple_of_64(data_len) as usize;
+                let mut null_buffer = Vec::with_capacity(size);
+                unsafe {
+                    ptr::write_bytes(null_buffer.as_mut_ptr(), 0, size);
+                    null_buffer.set_len(size);
                 }
-            },
-            &None => for i in 0..self.len {
-                let mut m = self.data.get(i);
-                match n {
-                    None => n = Some(*m),
-                    Some(nn) => if *m > nn {
-                        n = Some(*m)
-                    },
+                let mut value_buffer: Vec<u8> = Vec::with_capacity(size * TY_SIZE);
+
+                let mut i = 0;
+                for n in data {
+                    if let Some(v) = n {
+                        bit_util::set_bit(&mut null_buffer[..], i);
+                        value_buffer.extend_from_slice(&v.to_byte_slice());
+                    } else {
+                        value_buffer.extend_from_slice(&NULL);
+                    }
+                    i += 1;
                 }
-            },
+
+                let array_data = ArrayData::builder($data_ty)
+                    .len(data_len)
+                    .add_buffer(Buffer::from(Buffer::from(value_buffer)))
+                    .null_bit_buffer(Buffer::from(null_buffer))
+                    .build();
+                PrimitiveArray::from(array_data)
+            }
         }
-        n
-    }
+    };
 }
 
-/// Implement the Add operation for types that support Add
-impl<T> PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType + Add<Output = T>,
-{
-    pub fn add(&self, other: &PrimitiveArray<T>) -> PrimitiveArray<T> {
-        let mut builder: Builder<T> = Builder::new();
-        for i in 0..self.len {
-            let x = *self.data.get(i) + *other.data.get(i);
-            builder.push(x);
+/// Constructs a `PrimitiveArray` from an array data reference.
+impl<T: ArrowPrimitiveType> From<ArrayDataRef> for PrimitiveArray<T> {
+    fn from(data: ArrayDataRef) -> Self {
+        assert!(data.buffers().len() == 1);
+        let raw_values = data.buffers()[0].raw_data();
+        assert!(memory::is_aligned::<u8>(raw_values, mem::align_of::<T>()));
+        Self {
+            data: data,
+            raw_values: RawPtrBox::new(raw_values as *const T),
         }
-        PrimitiveArray::from(builder.finish())
     }
 }
 
-impl<T> Array for PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn len(&self) -> usize {
-        self.len
+impl<T: ArrowPrimitiveType> Array for PrimitiveArray<T> {
+    fn as_any(&self) -> &Any {
+        self
+    }
+
+    fn data(&self) -> ArrayDataRef {
+        self.data.clone()
+    }
+
+    fn data_ref(&self) -> &ArrayDataRef {
+        &self.data
+    }
+}
+
+def_primitive_array!(DataType::Boolean, bool);
+def_primitive_array!(DataType::UInt8, u8);
+def_primitive_array!(DataType::UInt16, u16);
+def_primitive_array!(DataType::UInt32, u32);
+def_primitive_array!(DataType::UInt64, u64);
+def_primitive_array!(DataType::Int8, i8);
+def_primitive_array!(DataType::Int16, i16);
+def_primitive_array!(DataType::Int32, i32);
+def_primitive_array!(DataType::Int64, i64);
+def_primitive_array!(DataType::Float32, f32);
+def_primitive_array!(DataType::Float64, f64);
+
+/// A list array where each element is a variable-sized sequence of values with the same
+/// type.
+pub struct ListArray {
+    data: ArrayDataRef,
+    values: ArrayRef,
+    value_offsets: RawPtrBox<i32>,
+}
+
+impl ListArray {
+    /// Returns an reference to the values of this list.
+    pub fn values(&self) -> ArrayRef {
+        self.values.clone()
+    }
+
+    /// Returns a clone of the value type of this list.
+    pub fn value_type(&self) -> DataType {
+        self.values.data().data_type().clone()
+    }
+
+    /// Returns the offset for value at index `i`.
+    ///
+    /// Note this doesn't do any bound checking, for performance reason.
+    #[inline]
+    pub fn value_offset(&self, i: i64) -> i32 {
+        self.value_offset_at(self.data.offset() + i)
     }
-    fn null_count(&self) -> usize {
-        self.null_count
+
+    /// Returns the length for value at index `i`.
+    ///
+    /// Note this doesn't do any bound checking, for performance reason.
+    #[inline]
+    pub fn value_length(&self, mut i: i64) -> i32 {
+        i += self.data.offset();
+        self.value_offset_at(i + 1) - self.value_offset_at(i)
     }
-    fn validity_bitmap(&self) -> &Option<Bitmap> {
-        &self.validity_bitmap
+
+    #[inline]
+    fn value_offset_at(&self, i: i64) -> i32 {
+        unsafe { *self.value_offsets.get().offset(i as isize) }
     }
+}
+
+/// Constructs a `ListArray` from an array data reference.
+impl From<ArrayDataRef> for ListArray {
+    fn from(data: ArrayDataRef) -> Self {
+        assert!(data.buffers().len() == 1);
+        assert!(data.child_data().len() == 1);
+        let values = make_array(data.child_data()[0].clone());
+        let raw_value_offsets = data.buffers()[0].raw_data();
+        assert!(memory::is_aligned(
+            raw_value_offsets,
+            mem::align_of::<i32>()
+        ));
+        let value_offsets = raw_value_offsets as *const i32;
+        unsafe {
+            assert!(*value_offsets.offset(0) == 0);
+            assert!(*value_offsets.offset(data.len() as isize) == values.data().len() as i32);
+        }
+        Self {
+            data: data.clone(),
+            values: values,
+            value_offsets: RawPtrBox::new(value_offsets),
+        }
+    }
+}
+
+impl Array for ListArray {
     fn as_any(&self) -> &Any {
         self
     }
+
+    fn data(&self) -> ArrayDataRef {
+        self.data.clone()
+    }
+
+    fn data_ref(&self) -> &ArrayDataRef {
+        &self.data
+    }
 }
 
-/// Create a BufferArray<T> from a Buffer<T> without null values
-impl<T> From<Buffer<T>> for PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn from(data: Buffer<T>) -> Self {
-        PrimitiveArray {
-            len: data.len(),
-            data: data,
-            validity_bitmap: None,
-            null_count: 0,
+/// A special type of `ListArray` whose elements are binaries.
+pub struct BinaryArray {
+    data: ArrayDataRef,
+    value_offsets: RawPtrBox<i32>,
+    value_data: RawPtrBox<u8>,
+}
+
+impl BinaryArray {
+    /// Returns the element at index `i` as a byte slice.
+    pub fn get_value(&self, i: i64) -> &[u8] {
+        assert!(i >= 0 && i < self.data.len());
+        let offset = i.checked_add(self.data.offset()).unwrap();
+        unsafe {
+            let pos = self.value_offset_at(offset);
+            ::std::slice::from_raw_parts(
+                self.value_data.get().offset(pos as isize),
+                (self.value_offset_at(offset + 1) - pos) as usize,
+            )
         }
     }
+
+    /// Returns the element at index `i` as a string.
+    ///
+    /// Note this doesn't do any bound checking, for performance reason.
+    pub fn get_string(&self, i: i64) -> String {
+        let slice = self.get_value(i);
+        unsafe { String::from_utf8_unchecked(Vec::from(slice)) }
+    }
+
+    /// Returns the offset for the element at index `i`.
+    ///
+    /// Note this doesn't do any bound checking, for performance reason.
+    #[inline]
+    pub fn value_offset(&self, i: i64) -> i32 {
+        self.value_offset_at(i)
+    }
+
+    /// Returns the length for the element at index `i`.
+    ///
+    /// Note this doesn't do any bound checking, for performance reason.
+    #[inline]
+    pub fn value_length(&self, mut i: i64) -> i32 {
+        i += self.data.offset();
+        self.value_offset_at(i + 1) - self.value_offset_at(i)
+    }
+
+    #[inline]
+    fn value_offset_at(&self, i: i64) -> i32 {
+        unsafe { *self.value_offsets.get().offset(i as isize) }
+    }
 }
 
-/// Create a BufferArray<T> from a Vec<T> of primitive values
-impl<T> From<Vec<T>> for PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType + 'static,
-{
-    fn from(vec: Vec<T>) -> Self {
-        PrimitiveArray::from(Buffer::from(vec))
+impl From<ArrayDataRef> for BinaryArray {
+    fn from(data: ArrayDataRef) -> Self {
+        assert!(data.buffers().len() == 2);
+        let raw_value_offsets = data.buffers()[0].raw_data();
+        assert!(memory::is_aligned(
+            raw_value_offsets,
+            mem::align_of::<i32>()
+        ));
+        let value_data = data.buffers()[1].raw_data();
+        Self {
+            data: data.clone(),
+            value_offsets: RawPtrBox::new(raw_value_offsets as *const i32),
+            value_data: RawPtrBox::new(value_data),
+        }
     }
 }
 
-/// Create a BufferArray<T> from a Vec<Optional<T>> with null handling
-impl<T> From<Vec<Option<T>>> for PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType + 'static,
-{
-    fn from(v: Vec<Option<T>>) -> Self {
-        let mut builder: Builder<T> = Builder::with_capacity(v.len());
-        builder.set_len(v.len());
-        let mut null_count = 0;
-        let mut validity_bitmap = Bitmap::new(v.len());
-        for i in 0..v.len() {
-            match v[i] {
-                Some(value) => builder.set(i, value),
-                None => {
-                    null_count += 1;
-                    validity_bitmap.clear(i);
-                }
-            }
+impl<'a> From<Vec<&'a str>> for BinaryArray {
+    fn from(v: Vec<&'a str>) -> Self {
+        let mut offsets = vec![];
+        let mut values = vec![];
+        let mut length_so_far = 0;
+        offsets.push(length_so_far);
+        for s in &v {
+            length_so_far += s.len() as i32;
+            offsets.push(length_so_far as i32);
+            values.extend_from_slice(s.as_bytes());
         }
-        PrimitiveArray::new(builder.finish(), null_count, Some(validity_bitmap))
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .len(v.len() as i64)
+            .add_buffer(Buffer::from(offsets.to_byte_slice()))
+            .add_buffer(Buffer::from(&values[..]))
+            .build();
+        BinaryArray::from(array_data)
+    }
+}
+
+impl Array for BinaryArray {
+    fn as_any(&self) -> &Any {
+        self
+    }
+
+    fn data(&self) -> ArrayDataRef {
+        self.data.clone()
+    }
+
+    fn data_ref(&self) -> &ArrayDataRef {
+        &self.data
     }
 }
 
-/// An Array of structs
+/// A nested array type where each child (called *field*) is represented by a separate array.
 pub struct StructArray {
-    len: usize,
-    columns: Vec<Arc<Array>>,
-    null_count: usize,
-    validity_bitmap: Option<Bitmap>,
+    data: ArrayDataRef,
+    boxed_fields: Vec<ArrayRef>,
 }
 
 impl StructArray {
-    pub fn num_columns(&self) -> usize {
-        self.columns.len()
+    /// Returns the field at `pos`.
+    pub fn column(&self, pos: usize) -> &ArrayRef {
+        &self.boxed_fields[pos]
     }
-    pub fn column(&self, i: usize) -> &Arc<Array> {
-        &self.columns[i]
+}
+
+impl From<ArrayDataRef> for StructArray {
+    fn from(data: ArrayDataRef) -> Self {
+        let mut boxed_fields = vec![];
+        for cd in data.child_data() {
+            boxed_fields.push(make_array(cd.clone()));
+        }
+        Self {
+            data: data,
+            boxed_fields: boxed_fields,
+        }
     }
 }
 
 impl Array for StructArray {
-    fn len(&self) -> usize {
-        self.len
-    }
-    fn null_count(&self) -> usize {
-        self.null_count
-    }
-    fn validity_bitmap(&self) -> &Option<Bitmap> {
-        &self.validity_bitmap
-    }
     fn as_any(&self) -> &Any {
         self
     }
+
+    fn data(&self) -> ArrayDataRef {
+        self.data.clone()
+    }
+
+    fn data_ref(&self) -> &ArrayDataRef {
+        &self.data
+    }
 }
 
-/// Create a StructArray from a list of arrays representing the fields of the struct. The fields
-/// must be in the same order as the schema defining the struct.
-impl From<Vec<Arc<Array>>> for StructArray {
-    fn from(data: Vec<Arc<Array>>) -> Self {
-        StructArray {
-            len: data[0].len(),
-            columns: data,
-            null_count: 0,
-            validity_bitmap: None,
-        }
+impl From<Vec<(Field, ArrayRef)>> for StructArray {
+    fn from(v: Vec<(Field, ArrayRef)>) -> Self {
+        let (field_types, field_values): (Vec<_>, Vec<_>) = v.into_iter().unzip();
+        let data = ArrayData::builder(DataType::Struct(field_types))
+            .child_data(field_values.into_iter().map(|a| a.data()).collect())
+            .build();
+        StructArray::from(data)
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use super::*;
     use std::thread;
 
+    use super::{Array, BinaryArray, ListArray, PrimitiveArray, StructArray};
+    use array_data::ArrayData;
+    use buffer::Buffer;
+    use datatypes::{DataType, Field, ToByteSlice};
+    use memory;
+
     #[test]
-    fn array_data_from_list_u8() {
-        let mut b: ListBuilder<u8> = ListBuilder::new();
-        b.push(&[1, 2, 3, 4, 5]);
-        b.push(&[5, 4, 3, 2, 1]);
-        let array_data = ListArray::from(b.finish());
-        assert_eq!(2, array_data.len());
+    fn test_primitive_array_from_vec() {
+        let buf = Buffer::from(&[0, 1, 2, 3, 4].to_byte_slice());
+        let buf2 = buf.clone();
+        let pa = PrimitiveArray::<i32>::new(5, buf, 0, 0);
+        let slice = unsafe { ::std::slice::from_raw_parts(pa.raw_values(), 5) };
+        assert_eq!(buf2, pa.values());
+        assert_eq!(&[0, 1, 2, 3, 4], slice);
+        assert_eq!(5, pa.len());
+        assert_eq!(0, pa.offset());
+        assert_eq!(0, pa.null_count());
+        for i in 0..5 {
+            assert!(!pa.is_null(i));
+            assert!(pa.is_valid(i));
+            assert_eq!(i as i32, pa.value(i));
+        }
     }
 
     #[test]
-    fn array_from_list_u8() {
-        let mut b: ListBuilder<u8> = ListBuilder::new();
-        b.push("Hello, ".as_bytes());
-        b.push("World!".as_bytes());
-        let array = ListArray::from(b.finish());
-        // downcast back to the data
-        let array_list_u8 = array.as_any().downcast_ref::<ListArray<u8>>().unwrap();
-        assert_eq!(2, array_list_u8.len());
-        assert_eq!("Hello, ", str::from_utf8(array_list_u8.get(0)).unwrap());
-        assert_eq!("World!", str::from_utf8(array_list_u8.get(1)).unwrap());
+    fn test_primitive_array_from_vec_option() {
+        // Test building a primitive array with null values
+        let pa = PrimitiveArray::<i32>::from(vec![Some(0), None, Some(2), None, Some(4)]);
+        assert_eq!(5, pa.len());
+        assert_eq!(0, pa.offset());
+        assert_eq!(2, pa.null_count());
+        for i in 0..5 {
+            if i % 2 == 0 {
+                assert!(!pa.is_null(i));
+                assert!(pa.is_valid(i));
+                assert_eq!(i as i32, pa.value(i));
+            } else {
+                assert!(pa.is_null(i));
+                assert!(!pa.is_valid(i));
+            }
+        }
     }
 
     #[test]
-    fn test_from_bool() {
-        let a = PrimitiveArray::from(vec![false, false, true, false]);
-        assert_eq!(4, a.len());
-        assert_eq!(0, a.null_count());
+    fn test_primitive_array_builder() {
+        // Test building an primitive array with ArrayData builder and offset
+        let buf = Buffer::from(&[0, 1, 2, 3, 4].to_byte_slice());
+        let buf2 = buf.clone();
+        let data = ArrayData::builder(DataType::Int32)
+            .len(5)
+            .offset(2)
+            .add_buffer(buf)
+            .build();
+        let pa = PrimitiveArray::<i32>::from(data);
+        assert_eq!(buf2, pa.values());
+        assert_eq!(5, pa.len());
+        assert_eq!(0, pa.null_count());
+        for i in 0..3 {
+            assert_eq!((i + 2) as i32, pa.value(i));
+        }
     }
 
     #[test]
-    fn test_from_f32() {
-        let a = PrimitiveArray::from(vec![1.23, 2.34, 3.45, 4.56]);
-        assert_eq!(4, a.len());
+    #[should_panic(expected = "")]
+    fn test_primitive_array_invalid_buffer_len() {
+        let data = ArrayData::builder(DataType::Int32).len(5).build();
+        PrimitiveArray::<i32>::from(data);
     }
 
     #[test]
-    fn test_from_i32() {
-        let a = PrimitiveArray::from(vec![15, 14, 13, 12, 11]);
-        assert_eq!(5, a.len());
+    fn test_list_array() {
+        // Construct a value array
+        let value_data = ArrayData::builder(DataType::Int32)
+            .len(7)
+            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .build();
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[0, 1, 2], [3, 4, 5], [6, 7]]
+        let value_offsets = Buffer::from(&[0, 2, 5, 7].to_byte_slice());
+
+        // Construct a list array from the above two
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type.clone())
+            .len(3)
+            .add_buffer(value_offsets.clone())
+            .add_child_data(value_data.clone())
+            .build();
+        let list_array = ListArray::from(list_data);
+
+        let values = list_array.values();
+        assert_eq!(value_data, values.data());
+        assert_eq!(DataType::Int32, list_array.value_type());
+        assert_eq!(3, list_array.len());
+        assert_eq!(0, list_array.null_count());
+        assert_eq!(5, list_array.value_offset(2));
+        assert_eq!(2, list_array.value_length(2));
+        for i in 0..3 {
+            assert!(list_array.is_valid(i as i64));
+            assert!(!list_array.is_null(i as i64));
+        }
+
+        // Now test with a non-zero offset
+        let list_data = ArrayData::builder(list_data_type)
+            .len(3)
+            .offset(1)
+            .add_buffer(value_offsets)
+            .add_child_data(value_data.clone())
+            .build();
+        let list_array = ListArray::from(list_data);
+
+        let values = list_array.values();
+        assert_eq!(value_data, values.data());
+        assert_eq!(DataType::Int32, list_array.value_type());
+        assert_eq!(3, list_array.len());
+        assert_eq!(0, list_array.null_count());
+        assert_eq!(5, list_array.value_offset(1));
+        assert_eq!(2, list_array.value_length(1));
     }
 
     #[test]
-    fn test_from_empty_vec() {
-        let v: Vec<i32> = vec![];
-        let a = PrimitiveArray::from(v);
-        assert_eq!(0, a.len());
+    #[should_panic(expected = "")]
+    fn test_list_array_invalid_buffer_len() {
+        let value_data = ArrayData::builder(DataType::Int32)
+            .len(7)
+            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .build();
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type)
+            .len(3)
+            .add_child_data(value_data)
+            .build();
+        ListArray::from(list_data);
     }
 
     #[test]
-    fn test_from_optional_i32() {
-        let a = PrimitiveArray::from(vec![Some(1), None, Some(2), Some(3), None]);
-        assert_eq!(5, a.len());
-        assert_eq!(2, a.null_count());
-        // 1 == not null
-        match a.validity_bitmap() {
-            &Some(ref validity_bitmap) => {
-                assert_eq!(true, validity_bitmap.is_set(0));
-                assert_eq!(false, validity_bitmap.is_set(1));
-                assert_eq!(true, validity_bitmap.is_set(2));
-                assert_eq!(true, validity_bitmap.is_set(3));
-                assert_eq!(false, validity_bitmap.is_set(4));
-            }
-            _ => panic!(),
+    #[should_panic(expected = "")]
+    fn test_list_array_invalid_child_array_len() {
+        let value_offsets = Buffer::from(&[0, 2, 5, 7].to_byte_slice());
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type)
+            .len(3)
+            .add_buffer(value_offsets)
+            .build();
+        ListArray::from(list_data);
+    }
+
+    #[test]
+    #[should_panic(expected = "")]
+    fn test_list_array_invalid_value_offset_start() {
+        let value_data = ArrayData::builder(DataType::Int32)
+            .len(7)
+            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .build();
+
+        let value_offsets = Buffer::from(&[2, 2, 5, 7].to_byte_slice());
+
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type.clone())
+            .len(3)
+            .add_buffer(value_offsets.clone())
+            .add_child_data(value_data.clone())
+            .build();
+        ListArray::from(list_data);
+    }
+
+    #[test]
+    #[should_panic(expected = "")]
+    fn test_list_array_invalid_value_offset_end() {
+        let value_data = ArrayData::builder(DataType::Int32)
+            .len(7)
+            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .build();
+
+        let value_offsets = Buffer::from(&[0, 2, 5, 8].to_byte_slice());
+
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type.clone())
+            .len(3)
+            .add_buffer(value_offsets.clone())
+            .add_child_data(value_data.clone())
+            .build();
+        ListArray::from(list_data);
+    }
+
+    #[test]
+    fn test_binary_array() {
+        let values: [u8; 12] = [
+            b'h', b'e', b'l', b'l', b'o', b'p', b'a', b'r', b'q', b'u', b'e', b't',
+        ];
+        let offsets: [i32; 4] = [0, 5, 5, 12];
+
+        // Array data: ["hello", "", "parquet"]
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .len(3)
+            .add_buffer(Buffer::from(offsets.to_byte_slice()))
+            .add_buffer(Buffer::from(&values[..]))
+            .build();
+        let binary_array = BinaryArray::from(array_data);
+        assert_eq!(3, binary_array.len());
+        assert_eq!(0, binary_array.null_count());
+        assert_eq!([b'h', b'e', b'l', b'l', b'o'], binary_array.get_value(0));
+        assert_eq!("hello", binary_array.get_string(0));
+        assert_eq!([] as [u8; 0], binary_array.get_value(1));
+        assert_eq!("", binary_array.get_string(1));
+        assert_eq!(
+            [b'p', b'a', b'r', b'q', b'u', b'e', b't'],
+            binary_array.get_value(2)
+        );
+        assert_eq!("parquet", binary_array.get_string(2));
+        assert_eq!(5, binary_array.value_offset(2));
+        assert_eq!(7, binary_array.value_length(2));
+        for i in 0..3 {
+            assert!(binary_array.is_valid(i as i64));
+            assert!(!binary_array.is_null(i as i64));
         }
+
+        // Test binary array with offset
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .len(4)
+            .offset(1)
+            .add_buffer(Buffer::from(offsets.to_byte_slice()))
+            .add_buffer(Buffer::from(&values[..]))
+            .build();
+        let binary_array = BinaryArray::from(array_data);
+        assert_eq!(
+            [b'p', b'a', b'r', b'q', b'u', b'e', b't'],
+            binary_array.get_value(1)
+        );
+        assert_eq!("parquet", binary_array.get_string(1));
+    }
+
+    #[test]
+    #[should_panic(expected = "")]
+    fn test_binary_array_get_value_index_out_of_bound() {
+        let values: [u8; 12] = [
+            b'h', b'e', b'l', b'l', b'o', b'p', b'a', b'r', b'q', b'u', b'e', b't',
+        ];
+        let offsets: [i32; 4] = [0, 5, 5, 12];
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .len(3)
+            .add_buffer(Buffer::from(offsets.to_byte_slice()))
+            .add_buffer(Buffer::from(&values[..]))
+            .build();
+        let binary_array = BinaryArray::from(array_data);
+        binary_array.get_value(4);
+    }
+
+    #[test]
+    fn test_struct_array() {
+        let boolean_data = ArrayData::builder(DataType::Boolean)
+            .len(4)
+            .add_buffer(Buffer::from([false, false, true, true].to_byte_slice()))
+            .build();
+        let int_data = ArrayData::builder(DataType::Int64)
+            .len(4)
+            .add_buffer(Buffer::from([42, 28, 19, 31].to_byte_slice()))
+            .build();
+        let mut field_types = vec![];
+        field_types.push(Field::new("a", DataType::Boolean, false));
+        field_types.push(Field::new("b", DataType::Int64, false));
+        let struct_array_data = ArrayData::builder(DataType::Struct(field_types))
+            .add_child_data(boolean_data.clone())
+            .add_child_data(int_data.clone())
+            .build();
+        let struct_array = StructArray::from(struct_array_data);
+
+        assert_eq!(boolean_data, struct_array.column(0).data());
+        assert_eq!(int_data, struct_array.column(1).data());
     }
 
     #[test]
-    fn test_struct() {
-        let a: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![1, 2, 3, 4, 5])));
-        let b: Arc<Array> = Arc::new(PrimitiveArray::from(Buffer::from(vec![
-            1.1, 2.2, 3.3, 4.4, 5.5,
-        ])));
+    #[should_panic(expected = "")]
+    fn test_primitive_array_alignment() {
+        let ptr = memory::allocate_aligned(8).unwrap();
+        let buf = Buffer::from_raw_parts(ptr, 8);
+        let buf2 = buf.slice(1);
+        let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build();
+        PrimitiveArray::<i32>::from(array_data);
+    }
+
+    #[test]
+    #[should_panic(expected = "")]
+    fn test_list_array_alignment() {
+        let ptr = memory::allocate_aligned(8).unwrap();
+        let buf = Buffer::from_raw_parts(ptr, 8);
+        let buf2 = buf.slice(1);
+
+        let values: [i32; 8] = [0; 8];
+        let value_data = ArrayData::builder(DataType::Int32)
+            .add_buffer(Buffer::from(values.to_byte_slice()))
+            .build();
+
+        let list_data_type = DataType::List(Box::new(DataType::Int32));
+        let list_data = ArrayData::builder(list_data_type.clone())
+            .add_buffer(buf2)
+            .add_child_data(value_data.clone())
+            .build();
+        ListArray::from(list_data);
+    }
+
+    #[test]
+    #[should_panic(expected = "")]
+    fn test_binary_array_alignment() {
+        let ptr = memory::allocate_aligned(8).unwrap();
+        let buf = Buffer::from_raw_parts(ptr, 8);
+        let buf2 = buf.slice(1);
+
+        let values: [u8; 12] = [0; 12];
 
-        let s = StructArray::from(vec![a, b]);
-        assert_eq!(2, s.num_columns());
-        assert_eq!(0, s.null_count());
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .add_buffer(buf2)
+            .add_buffer(Buffer::from(&values[..]))
+            .build();
+        BinaryArray::from(array_data);
     }
 
     #[test]
     fn test_buffer_array_min_max() {
-        let a = PrimitiveArray::from(Buffer::from(vec![5, 6, 7, 8, 9]));
+        let a = PrimitiveArray::<i32>::from(vec![5, 6, 7, 8, 9]);
         assert_eq!(5, a.min().unwrap());
         assert_eq!(9, a.max().unwrap());
     }
 
     #[test]
     fn test_buffer_array_min_max_with_nulls() {
-        let a = PrimitiveArray::from(vec![Some(5), None, None, Some(8), Some(9)]);
+        let a = PrimitiveArray::<i32>::from(vec![Some(5), None, None, Some(8), Some(9)]);
         assert_eq!(5, a.min().unwrap());
         assert_eq!(9, a.max().unwrap());
     }
 
     #[test]
     fn test_access_array_concurrently() {
-        let a = PrimitiveArray::from(Buffer::from(vec![5, 6, 7, 8, 9]));
+        let a = PrimitiveArray::<i32>::from(vec![5, 6, 7, 8, 9]);
 
-        let ret = thread::spawn(move || a.iter().collect::<Vec<i32>>()).join();
+        let ret = thread::spawn(move || a.value(3)).join();
 
         assert!(ret.is_ok());
-        assert_eq!(vec![5, 6, 7, 8, 9], ret.ok().unwrap());
+        assert_eq!(8, ret.ok().unwrap());
     }
 }
diff --git a/rust/src/array_data.rs b/rust/src/array_data.rs
new file mode 100644
index 0000000..6ad9e2e
--- /dev/null
+++ b/rust/src/array_data.rs
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use bitmap::Bitmap;
+use buffer::Buffer;
+use datatypes::DataType;
+use util::bit_util;
+
+/// An generic representation of Arrow array data which encapsulates common attributes and
+/// operations for Arrow array. Specific operations for different arrays types (e.g.,
+/// primitive, list, struct) are implemented in `Array`.
+#[derive(PartialEq, Debug)]
+pub struct ArrayData {
+    /// The data type for this array data
+    data_type: DataType,
+
+    /// The number of elements in this array data
+    len: i64,
+
+    /// The number of null elements in this array data
+    null_count: i64,
+
+    /// The offset into this array data
+    offset: i64,
+
+    /// The buffers for this array data. Note that depending on the array types, this
+    /// could hold different kinds of buffers (e.g., value buffer, value offset buffer)
+    /// at different positions.
+    buffers: Vec<Buffer>,
+
+    /// The child(ren) of this array. Only non-empty for nested types, currently
+    /// `ListArray` and `StructArray`.
+    child_data: Vec<ArrayDataRef>,
+
+    /// The null bitmap. A `None` value for this indicates all values are non-null in
+    /// this array.
+    null_bitmap: Option<Bitmap>,
+}
+
+pub type ArrayDataRef = Arc<ArrayData>;
+pub const UNKNOWN_NULL_COUNT: i64 = -1;
+
+impl ArrayData {
+    pub fn new(
+        data_type: DataType,
+        len: i64,
+        mut null_count: i64,
+        null_bit_buffer: Option<Buffer>,
+        offset: i64,
+        buffers: Vec<Buffer>,
+        child_data: Vec<ArrayDataRef>,
+    ) -> Self {
+        if null_count < 0 {
+            null_count = if let Some(ref buf) = null_bit_buffer {
+                len - bit_util::count_set_bits_offset(buf.data(), offset)
+            } else {
+                0
+            };
+        }
+        let null_bitmap = null_bit_buffer.map(Bitmap::from);
+        Self {
+            data_type,
+            len,
+            null_count,
+            offset,
+            buffers,
+            child_data,
+            null_bitmap,
+        }
+    }
+
+    /// Returns a builder to construct a `ArrayData` instance.
+    pub fn builder(data_type: DataType) -> ArrayDataBuilder {
+        ArrayDataBuilder::new(data_type)
+    }
+
+    /// Returns a reference to the data type of this array data
+    pub fn data_type(&self) -> &DataType {
+        &self.data_type
+    }
+
+    /// Returns a slice of buffers for this array data
+    pub fn buffers(&self) -> &[Buffer] {
+        &self.buffers[..]
+    }
+
+    /// Returns a slice of children data arrays
+    pub fn child_data(&self) -> &[ArrayDataRef] {
+        &self.child_data[..]
+    }
+
+    /// Returns whether the element at index `i` is null
+    pub fn is_null(&self, i: i64) -> bool {
+        if let Some(ref b) = self.null_bitmap {
+            return !b.is_set(i);
+        }
+        false
+    }
+
+    /// Returns a reference to the null bitmap of this array data
+    pub fn null_bitmap(&self) -> &Option<Bitmap> {
+        &self.null_bitmap
+    }
+
+    /// Returns whether the element at index `i` is not null
+    pub fn is_valid(&self, i: i64) -> bool {
+        if let Some(ref b) = self.null_bitmap {
+            return b.is_set(i);
+        }
+        true
+    }
+
+    /// Returns the length (i.e., number of elements) of this array
+    pub fn len(&self) -> i64 {
+        self.len
+    }
+
+    /// Returns the offset of this array
+    pub fn offset(&self) -> i64 {
+        self.offset
+    }
+
+    /// Returns the total number of nulls in this array
+    pub fn null_count(&self) -> i64 {
+        self.null_count
+    }
+}
+
+/// Builder for `ArrayData` type
+pub struct ArrayDataBuilder {
+    data_type: DataType,
+    len: i64,
+    null_count: i64,
+    null_bit_buffer: Option<Buffer>,
+    offset: i64,
+    buffers: Vec<Buffer>,
+    child_data: Vec<ArrayDataRef>,
+}
+
+impl ArrayDataBuilder {
+    pub fn new(data_type: DataType) -> Self {
+        Self {
+            data_type: data_type,
+            len: 0,
+            null_count: UNKNOWN_NULL_COUNT,
+            null_bit_buffer: None,
+            offset: 0,
+            buffers: vec![],
+            child_data: vec![],
+        }
+    }
+
+    pub fn len(mut self, n: i64) -> Self {
+        self.len = n;
+        self
+    }
+
+    pub fn null_count(mut self, n: i64) -> Self {
+        self.null_count = n;
+        self
+    }
+
+    pub fn null_bit_buffer(mut self, buf: Buffer) -> Self {
+        self.null_bit_buffer = Some(buf);
+        self
+    }
+
+    pub fn offset(mut self, n: i64) -> Self {
+        self.offset = n;
+        self
+    }
+
+    pub fn buffers(mut self, v: Vec<Buffer>) -> Self {
+        self.buffers = v;
+        self
+    }
+
+    pub fn add_buffer(mut self, b: Buffer) -> Self {
+        self.buffers.push(b);
+        self
+    }
+
+    pub fn child_data(mut self, v: Vec<ArrayDataRef>) -> Self {
+        self.child_data = v;
+        self
+    }
+
+    pub fn add_child_data(mut self, r: ArrayDataRef) -> Self {
+        self.child_data.push(r);
+        self
+    }
+
+    pub fn build(self) -> ArrayDataRef {
+        let data = ArrayData::new(
+            self.data_type,
+            self.len,
+            self.null_count,
+            self.null_bit_buffer,
+            self.offset,
+            self.buffers,
+            self.child_data,
+        );
+        Arc::new(data)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::{ArrayData, DataType};
+    use buffer::Buffer;
+    use util::bit_util;
+
+    #[test]
+    fn test_new() {
+        let arr_data = ArrayData::new(DataType::Boolean, 10, 1, None, 2, vec![], vec![]);
+        assert_eq!(10, arr_data.len());
+        assert_eq!(1, arr_data.null_count());
+        assert_eq!(2, arr_data.offset());
+        assert_eq!(0, arr_data.buffers().len());
+        assert_eq!(0, arr_data.child_data().len());
+    }
+
+    #[test]
+    fn test_builder() {
+        let v = vec![0, 1, 2, 3];
+        let child_arr_data = Arc::new(ArrayData::new(
+            DataType::Int32,
+            10,
+            0,
+            None,
+            0,
+            vec![],
+            vec![],
+        ));
+        let b1 = Buffer::from(&v[..]);
+        let arr_data = ArrayData::builder(DataType::Int32)
+            .len(20)
+            .null_count(10)
+            .offset(5)
+            .add_buffer(b1)
+            .add_child_data(child_arr_data.clone())
+            .build();
+
+        assert_eq!(20, arr_data.len());
+        assert_eq!(10, arr_data.null_count());
+        assert_eq!(5, arr_data.offset());
+        assert_eq!(1, arr_data.buffers().len());
+        assert_eq!(&[0, 1, 2, 3], arr_data.buffers()[0].data());
+        assert_eq!(1, arr_data.child_data().len());
+        assert_eq!(child_arr_data, arr_data.child_data()[0]);
+    }
+
+    #[test]
+    fn test_null_count() {
+        let mut bit_v: [u8; 2] = [0; 2];
+        bit_util::set_bit(&mut bit_v, 0);
+        bit_util::set_bit(&mut bit_v, 3);
+        bit_util::set_bit(&mut bit_v, 10);
+        let arr_data = ArrayData::builder(DataType::Int32)
+            .len(16)
+            .null_bit_buffer(Buffer::from(bit_v))
+            .build();
+        assert_eq!(13, arr_data.null_count());
+
+        // Test with offset
+        let mut bit_v: [u8; 2] = [0; 2];
+        bit_util::set_bit(&mut bit_v, 0);
+        bit_util::set_bit(&mut bit_v, 3);
+        bit_util::set_bit(&mut bit_v, 10);
+        let arr_data = ArrayData::builder(DataType::Int32)
+            .len(16)
+            .offset(2)
+            .null_bit_buffer(Buffer::from(bit_v))
+            .build();
+        assert_eq!(14, arr_data.null_count());
+    }
+}
diff --git a/rust/src/bitmap.rs b/rust/src/bitmap.rs
index 27bf2d6..2fdcf87 100644
--- a/rust/src/bitmap.rs
+++ b/rust/src/bitmap.rs
@@ -16,9 +16,11 @@
 // under the License.
 
 use super::buffer::Buffer;
+use util::bit_util;
 
+#[derive(PartialEq, Debug)]
 pub struct Bitmap {
-    bits: Buffer<u8>,
+    bits: Buffer,
 }
 
 impl Bitmap {
@@ -35,7 +37,7 @@ impl Bitmap {
             v.push(255); // 1 is not null
         }
         Bitmap {
-            bits: Buffer::from(v),
+            bits: Buffer::from(&v[..]),
         }
     }
 
@@ -43,21 +45,14 @@ impl Bitmap {
         self.bits.len()
     }
 
-    pub fn is_set(&self, i: usize) -> bool {
-        let byte_offset = i / 8;
-        self.bits.get(byte_offset) & (1_u8 << ((i % 8) as u8)) > 0
-    }
-
-    pub fn set(&mut self, i: usize) {
-        let byte_offset = i / 8;
-        let v: u8 = { self.bits.get(byte_offset) | (1_u8 << ((i % 8) as u8)) };
-        self.bits.set(byte_offset, v);
+    pub fn is_set(&self, i: i64) -> bool {
+        bit_util::get_bit(self.bits.data(), i)
     }
+}
 
-    pub fn clear(&mut self, i: usize) {
-        let byte_offset = i / 8;
-        let v: u8 = self.bits.get(byte_offset) ^ (1_u8 << ((i % 8) as u8));
-        self.bits.set(byte_offset, v);
+impl From<Buffer> for Bitmap {
+    fn from(buf: Buffer) -> Self {
+        Self { bits: buf }
     }
 }
 
@@ -73,13 +68,16 @@ mod tests {
     }
 
     #[test]
-    fn test_set_clear_bit() {
-        let mut b = Bitmap::new(64 * 8);
-        assert_eq!(true, b.is_set(12));
-        b.clear(12);
-        assert_eq!(false, b.is_set(12));
-        b.set(12);
-        assert_eq!(true, b.is_set(12));
+    fn test_bitmap_is_set() {
+        let bitmap = Bitmap::from(Buffer::from([0b01001010]));
+        assert_eq!(false, bitmap.is_set(0));
+        assert_eq!(true, bitmap.is_set(1));
+        assert_eq!(false, bitmap.is_set(2));
+        assert_eq!(true, bitmap.is_set(3));
+        assert_eq!(false, bitmap.is_set(4));
+        assert_eq!(false, bitmap.is_set(5));
+        assert_eq!(true, bitmap.is_set(6));
+        assert_eq!(false, bitmap.is_set(7));
     }
 
 }
diff --git a/rust/src/buffer.rs b/rust/src/buffer.rs
index bdc3601..624354a 100644
--- a/rust/src/buffer.rs
+++ b/rust/src/buffer.rs
@@ -15,283 +15,221 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
-use libc;
 use std::mem;
-use std::slice;
-
-use super::datatypes::*;
-use super::memory::*;
-
-/// Buffer<T> is essentially just a Vec<T> for fixed-width primitive types and the start of the
-/// memory region is aligned at a 64-byte boundary
-pub struct Buffer<T>
-where
-    T: ArrowPrimitiveType,
-{
-    /// Contiguous memory region holding instances of primitive T
-    data: *const T,
-    /// Number of elements in the buffer
+use std::sync::Arc;
+
+use memory;
+
+/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte
+/// boundary. Buffer is immutable.
+#[derive(PartialEq, Debug)]
+pub struct Buffer {
+    /// Reference-counted pointer to the internal byte buffer.
+    data: Arc<BufferData>,
+
+    /// The offset into the buffer.
+    offset: usize,
+}
+
+#[derive(Debug)]
+struct BufferData {
+    /// The raw pointer into the buffer bytes
+    ptr: *const u8,
+
+    /// The length of the buffer
     len: usize,
 }
 
-impl<T> Buffer<T>
-where
-    T: ArrowPrimitiveType,
-{
-    /// create a buffer from an existing region of memory (must already be byte-aligned)
-    pub unsafe fn from_raw_parts(data: *const T, len: usize) -> Self {
-        Buffer { data, len }
+impl PartialEq for BufferData {
+    fn eq(&self, other: &BufferData) -> bool {
+        if self.len != other.len {
+            return false;
+        }
+        unsafe { memory::memcmp(self.ptr, other.ptr, self.len as usize) == 0 }
     }
+}
 
-    /// Get the number of elements in the buffer
-    pub fn len(&self) -> usize {
-        self.len
+/// Release the underlying memory when the current buffer goes out of scope
+impl Drop for BufferData {
+    fn drop(&mut self) {
+        memory::free_aligned(self.ptr);
     }
+}
 
-    /// Get a pointer to the data contained by the buffer
-    pub fn data(&self) -> *const T {
-        self.data
+impl Buffer {
+    /// Creates a buffer from an existing memory region (must already be byte-aligned)
+    pub fn from_raw_parts(ptr: *const u8, len: usize) -> Self {
+        assert!(memory::is_aligned(ptr, 64));
+        let buf_data = BufferData { ptr: ptr, len: len };
+        Buffer {
+            data: Arc::new(buf_data),
+            offset: 0,
+        }
     }
 
-    pub fn slice(&self, start: usize, end: usize) -> &[T] {
-        assert!(end <= self.len);
-        assert!(start <= end);
-        unsafe { slice::from_raw_parts(self.data.offset(start as isize), end - start) }
+    /// Returns the number of bytes in the buffer
+    pub fn len(&self) -> usize {
+        self.data.len - self.offset as usize
     }
 
-    /// Get a reference to the value at the specified offset
-    pub fn get(&self, i: usize) -> &T {
-        assert!(i < self.len);
-        unsafe { &(*self.data.offset(i as isize)) }
+    /// Returns whether the buffer is empty.
+    pub fn is_empty(&self) -> bool {
+        self.data.len - self.offset == 0
     }
 
-    /// Write to a slot in the buffer
-    pub fn set(&mut self, i: usize, v: T) {
-        assert!(i < self.len);
-        let p = self.data as *mut T;
-        unsafe {
-            *p.offset(i as isize) = v;
-        }
+    /// Returns the byte slice stored in this buffer
+    pub fn data(&self) -> &[u8] {
+        unsafe { ::std::slice::from_raw_parts(self.raw_data(), self.len()) }
     }
 
-    /// Return an iterator over the values in the buffer
-    pub fn iter(&self) -> BufferIterator<T> {
-        BufferIterator {
-            data: self.data,
-            len: self.len,
-            index: 0,
+    /// Returns a slice of this buffer, starting from `offset`.
+    pub fn slice(&self, offset: usize) -> Buffer {
+        assert!(self.offset + offset <= self.len());
+        Buffer {
+            data: self.data.clone(),
+            offset: self.offset + offset,
         }
     }
-}
 
-/// Release the underlying memory when the Buffer goes out of scope
-impl<T> Drop for Buffer<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn drop(&mut self) {
-        free_aligned(self.data as *const u8);
+    /// Returns a raw pointer for this buffer.
+    ///
+    /// Note that this should be used cautiously, and the returned pointer should not be
+    /// stored anywhere, to avoid dangling pointers.
+    pub fn raw_data(&self) -> *const u8 {
+        unsafe { self.data.ptr.offset(self.offset as isize) }
     }
-}
-
-/// Iterator over the elements of a buffer
-pub struct BufferIterator<T>
-where
-    T: ArrowPrimitiveType,
-{
-    data: *const T,
-    len: usize,
-    index: isize,
-}
 
-impl<T> Iterator for BufferIterator<T>
-where
-    T: ArrowPrimitiveType,
-{
-    type Item = T;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.index < self.len as isize {
-            let value = unsafe { *self.data.offset(self.index) };
-            self.index += 1;
-            Some(value)
-        } else {
-            None
-        }
+    /// Returns an empty buffer.
+    pub fn empty() -> Buffer {
+        Buffer::from_raw_parts(::std::ptr::null(), 0)
     }
 }
 
-/// Copy the memory from a Vec<T> into a newly allocated Buffer<T>
-impl<T> From<Vec<T>> for Buffer<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn from(v: Vec<T>) -> Self {
-        // allocate aligned memory buffer
-        let len = v.len();
-        let sz = mem::size_of::<T>();
-        let buffer = allocate_aligned((len * sz) as i64).unwrap();
+impl Clone for Buffer {
+    fn clone(&self) -> Buffer {
         Buffer {
-            len,
-            data: unsafe {
-                let dst = mem::transmute::<*const u8, *mut libc::c_void>(buffer);
-                libc::memcpy(
-                    dst,
-                    mem::transmute::<*const T, *const libc::c_void>(v.as_ptr()),
-                    len * sz,
-                );
-                mem::transmute::<*mut libc::c_void, *const T>(dst)
-            },
+            data: self.data.clone(),
+            offset: self.offset,
         }
     }
 }
 
-impl From<Bytes> for Buffer<u8> {
-    fn from(bytes: Bytes) -> Self {
-        // allocate aligned
-        let len = bytes.len();
-        let sz = mem::size_of::<u8>();
-        let buf_mem = allocate_aligned((len * sz) as i64).unwrap();
-        let dst = buf_mem as *mut libc::c_void;
-        Buffer {
-            len,
-            data: unsafe {
-                libc::memcpy(dst, bytes.as_ptr() as *const libc::c_void, len * sz);
-                dst as *mut u8
-            },
+/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
+/// allocated memory region.
+impl<T: AsRef<[u8]>> From<T> for Buffer {
+    fn from(p: T) -> Self {
+        // allocate aligned memory buffer
+        let slice = p.as_ref();
+        let len = slice.len() * mem::size_of::<u8>();
+        let buffer = memory::allocate_aligned((len) as i64).unwrap();
+        unsafe {
+            memory::memcpy(buffer, slice.as_ptr(), len);
         }
+        Buffer::from_raw_parts(buffer, len)
     }
 }
 
-unsafe impl<T: ArrowPrimitiveType> Sync for Buffer<T> {}
-unsafe impl<T: ArrowPrimitiveType> Send for Buffer<T> {}
+unsafe impl Sync for Buffer {}
+unsafe impl Send for Buffer {}
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use std::ptr::null_mut;
     use std::thread;
 
-    #[test]
-    fn test_buffer_i32() {
-        let b: Buffer<i32> = Buffer::from(vec![1, 2, 3, 4, 5]);
-        assert_eq!(5, b.len);
-    }
+    use super::Buffer;
 
     #[test]
-    fn test_iterator_i32() {
-        let b: Buffer<i32> = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let it = b.iter();
-        let v: Vec<i32> = it.map(|n| n + 1).collect();
-        assert_eq!(vec![2, 3, 4, 5, 6], v);
-    }
+    fn test_buffer_data_equality() {
+        let buf1 = Buffer::from(&[0, 1, 2, 3, 4]);
+        let mut buf2 = Buffer::from(&[0, 1, 2, 3, 4]);
+        assert_eq!(buf1, buf2);
 
-    #[test]
-    fn test_buffer_eq() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let b = Buffer::from(vec![5, 4, 3, 2, 1]);
-        let c = a
-            .iter()
-            .zip(b.iter())
-            .map(|(a, b)| a == b)
-            .collect::<Vec<bool>>();
-        assert_eq!(c, vec![false, false, true, false, false]);
-    }
+        // slice with same offset should still preserve equality
+        let buf3 = buf1.slice(2);
+        assert!(buf1 != buf3);
+        let buf4 = buf2.slice(2);
+        assert_eq!(buf3, buf4);
 
-    #[test]
-    fn test_buffer_lt() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let b = Buffer::from(vec![5, 4, 3, 2, 1]);
-        let c = a
-            .iter()
-            .zip(b.iter())
-            .map(|(a, b)| a < b)
-            .collect::<Vec<bool>>();
-        assert_eq!(c, vec![true, true, false, false, false]);
-    }
+        // unequal because of different elements
+        buf2 = Buffer::from(&[0, 0, 2, 3, 4]);
+        assert!(buf1 != buf2);
 
-    #[test]
-    fn test_buffer_gt() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let b = Buffer::from(vec![5, 4, 3, 2, 1]);
-        let c = a
-            .iter()
-            .zip(b.iter())
-            .map(|(a, b)| a > b)
-            .collect::<Vec<bool>>();
-        assert_eq!(c, vec![false, false, false, true, true]);
+        // unequal because of different length
+        buf2 = Buffer::from(&[0, 1, 2, 3]);
+        assert!(buf1 != buf2);
     }
 
     #[test]
-    fn test_buffer_add() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let b = Buffer::from(vec![5, 4, 3, 2, 1]);
-        let c = a
-            .iter()
-            .zip(b.iter())
-            .map(|(a, b)| a + b)
-            .collect::<Vec<i32>>();
-        assert_eq!(c, vec![6, 6, 6, 6, 6]);
-    }
+    fn test_from_raw_parts() {
+        let buf = Buffer::from_raw_parts(null_mut(), 0);
+        assert_eq!(0, buf.len());
+        assert_eq!(0, buf.data().len());
+        assert!(buf.raw_data().is_null());
 
-    #[test]
-    fn test_buffer_multiply() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let b = Buffer::from(vec![5, 4, 3, 2, 1]);
-        let c = a
-            .iter()
-            .zip(b.iter())
-            .map(|(a, b)| a * b)
-            .collect::<Vec<i32>>();
-        assert_eq!(c, vec![5, 8, 9, 8, 5]);
+        let buf = Buffer::from(&[0, 1, 2, 3, 4]);
+        assert_eq!(5, buf.len());
+        assert!(!buf.raw_data().is_null());
+        assert_eq!(&[0, 1, 2, 3, 4], buf.data());
     }
 
     #[test]
-    #[should_panic]
-    fn test_get_out_of_bounds() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        a.get(123); // should panic
+    fn test_from_vec() {
+        let buf = Buffer::from(&[0, 1, 2, 3, 4]);
+        assert_eq!(5, buf.len());
+        assert!(!buf.raw_data().is_null());
+        assert_eq!(&[0, 1, 2, 3, 4], buf.data());
     }
 
     #[test]
-    fn slice_empty_at_end() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        let s = a.slice(5, 5);
-        assert_eq!(0, s.len());
+    fn test_copy() {
+        let buf = Buffer::from(&[0, 1, 2, 3, 4]);
+        let buf2 = buf.clone();
+        assert_eq!(5, buf2.len());
+        assert!(!buf2.raw_data().is_null());
+        assert_eq!(&[0, 1, 2, 3, 4], buf2.data());
     }
 
     #[test]
-    #[should_panic]
-    fn slice_start_out_of_bounds() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        a.slice(6, 6); // should panic
-    }
+    fn test_slice() {
+        let buf = Buffer::from(&[2, 4, 6, 8, 10]);
+        let buf2 = buf.slice(2);
 
-    #[test]
-    #[should_panic]
-    fn slice_end_out_of_bounds() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        a.slice(0, 6); // should panic
+        assert_eq!(&[6, 8, 10], buf2.data());
+        assert_eq!(3, buf2.len());
+        assert_eq!(unsafe { buf.raw_data().offset(2) }, buf2.raw_data());
+
+        let buf3 = buf2.slice(1);
+        assert_eq!(&[8, 10], buf3.data());
+        assert_eq!(2, buf3.len());
+        assert_eq!(unsafe { buf.raw_data().offset(3) }, buf3.raw_data());
+
+        let buf4 = buf.slice(5);
+        let empty_slice: [u8; 0] = [];
+        assert_eq!(empty_slice, buf4.data());
+        assert_eq!(0, buf4.len());
+        assert!(buf4.is_empty());
     }
 
     #[test]
-    #[should_panic]
-    fn slice_end_before_start() {
-        let a = Buffer::from(vec![1, 2, 3, 4, 5]);
-        a.slice(3, 2); // should panic
+    #[should_panic(expected = "")]
+    fn test_slice_offset_out_of_bound() {
+        let buf = Buffer::from(&[2, 4, 6, 8, 10]);
+        buf.slice(6);
     }
 
     #[test]
-    fn test_access_buffer_concurrently() {
+    fn test_access_concurrently() {
         let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
-        assert_eq!(vec![1, 2, 3, 4, 5], buffer.iter().collect::<Vec<i32>>());
+        let buffer2 = buffer.clone();
+        assert_eq!(&[1, 2, 3, 4, 5], buffer.data());
 
-        let collected_vec = thread::spawn(move || {
+        let buffer_copy = thread::spawn(move || {
             // access buffer in another thread.
-            buffer.iter().collect::<Vec<i32>>()
+            buffer.clone()
         }).join();
 
-        assert!(collected_vec.is_ok());
-        assert_eq!(vec![1, 2, 3, 4, 5], collected_vec.ok().unwrap());
+        assert!(buffer_copy.is_ok());
+        assert_eq!(buffer2, buffer_copy.ok().unwrap());
     }
 }
diff --git a/rust/src/builder.rs b/rust/src/builder.rs
index 833d6e8..ba9422d 100644
--- a/rust/src/builder.rs
+++ b/rust/src/builder.rs
@@ -143,11 +143,11 @@ where
     }
 
     /// Build a Buffer from the existing memory
-    pub fn finish(&mut self) -> Buffer<T> {
+    pub fn finish(&mut self) -> Buffer {
         assert!(!self.data.is_null());
-        let p = self.data as *const T;
+        let p = self.data;
         self.data = ptr::null_mut(); // ensure builder cannot be re-used
-        unsafe { Buffer::from_raw_parts(p, self.len) }
+        Buffer::from_raw_parts(p as *mut u8, self.len)
     }
 }
 
@@ -189,9 +189,6 @@ mod tests {
         }
         let a = b.finish();
         assert_eq!(5, a.len());
-        for i in 0..5 {
-            assert_eq!(&i, a.get(i as usize));
-        }
     }
 
     #[test]
@@ -202,9 +199,6 @@ mod tests {
         }
         let a = b.finish();
         assert_eq!(5, a.len());
-        for i in 0..5 {
-            assert_eq!(&i, a.get(i as usize));
-        }
     }
 
     #[test]
@@ -224,9 +218,6 @@ mod tests {
         b.push_slice("World!".as_bytes());
         let buffer = b.finish();
         assert_eq!(13, buffer.len());
-
-        let s = String::from_utf8(buffer.iter().collect::<Vec<u8>>()).unwrap();
-        assert_eq!("Hello, World!", s);
     }
 
     #[test]
diff --git a/rust/src/datatypes.rs b/rust/src/datatypes.rs
index 2adec0b..2ce0cc0 100644
--- a/rust/src/datatypes.rs
+++ b/rust/src/datatypes.rs
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::error::ArrowError;
-use serde_json::Value;
 use std::fmt;
+use std::mem::size_of;
+use std::slice::from_raw_parts;
+
+use error::ArrowError;
+use serde_json::Value;
 
 /// Arrow data type
 #[derive(Debug, Clone, PartialEq, Eq)]
@@ -62,6 +65,31 @@ impl ArrowPrimitiveType for i64 {}
 impl ArrowPrimitiveType for f32 {}
 impl ArrowPrimitiveType for f64 {}
 
+pub trait ToByteSlice {
+    /// Converts this instance into a byte slice.
+    fn to_byte_slice(&self) -> &[u8];
+}
+
+impl<T> ToByteSlice for [T]
+where
+    T: ArrowPrimitiveType,
+{
+    fn to_byte_slice(&self) -> &[u8] {
+        let raw_ptr = self.as_ptr() as *const T as *const u8;
+        unsafe { from_raw_parts(raw_ptr, self.len() * size_of::<T>()) }
+    }
+}
+
+impl<T> ToByteSlice for T
+where
+    T: ArrowPrimitiveType,
+{
+    fn to_byte_slice(&self) -> &[u8] {
+        let raw_ptr = self as *const T as *const u8;
+        unsafe { from_raw_parts(raw_ptr, size_of::<T>()) }
+    }
+}
+
 impl DataType {
     /// Parse a data type from a JSON representation
     fn from(json: &Value) -> Result<DataType, ArrowError> {
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index b67b31b..d498341 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -21,14 +21,16 @@ extern crate libc;
 #[macro_use]
 extern crate serde_json;
 
+extern crate rand;
+
 pub mod array;
+pub mod array_data;
 pub mod bitmap;
 pub mod buffer;
 pub mod builder;
 pub mod datatypes;
 pub mod error;
-pub mod list;
-pub mod list_builder;
 pub mod memory;
 pub mod memory_pool;
 pub mod record_batch;
+pub mod util;
diff --git a/rust/src/list.rs b/rust/src/list.rs
deleted file mode 100644
index 32dcb6f..0000000
--- a/rust/src/list.rs
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::str;
-
-use super::buffer::Buffer;
-use super::datatypes::*;
-use super::list_builder::ListBuilder;
-
-/// List<T> is a nested type in which each array slot contains a variable-size sequence of values of
-/// the same type T
-pub struct List<T>
-where
-    T: ArrowPrimitiveType,
-{
-    /// Contiguous region of memory holding contents of the lists
-    data: Buffer<T>,
-    /// offsets to start of each array slot
-    offsets: Buffer<i32>,
-}
-
-impl<T> List<T>
-where
-    T: ArrowPrimitiveType,
-{
-    /// Create a List from raw parts
-    pub fn from_raw_parts(data: Buffer<T>, offsets: Buffer<i32>) -> Self {
-        List { data, offsets }
-    }
-
-    /// Get the length of the List (number of array slots)
-    pub fn len(&self) -> usize {
-        self.offsets.len() - 1
-    }
-
-    /// Get a reference to the raw data in the list
-    pub fn data(&self) -> &Buffer<T> {
-        &self.data
-    }
-
-    /// Get a reference to the offsets in the list
-    pub fn offsets(&self) -> &Buffer<i32> {
-        &self.offsets
-    }
-
-    /// Get the contents of a single array slot
-    pub fn get(&self, index: usize) -> &[T] {
-        let start = *self.offsets.get(index) as usize;
-        let end = *self.offsets.get(index + 1) as usize;
-        self.data.slice(start, end)
-    }
-}
-
-/// Create a List<u8> from a Vec<String>
-impl From<Vec<String>> for List<u8> {
-    fn from(v: Vec<String>) -> Self {
-        let mut b: ListBuilder<u8> = ListBuilder::with_capacity(v.len());
-        v.iter().for_each(|s| {
-            b.push(s.as_bytes());
-        });
-        b.finish()
-    }
-}
-
-/// Create a List<u8> from a Vec<&str>
-impl From<Vec<&'static str>> for List<u8> {
-    fn from(v: Vec<&'static str>) -> Self {
-        List::from(v.iter().map(|s| s.to_string()).collect::<Vec<String>>())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_utf8_slices() {
-        let list = List::from(vec!["this", "is", "a", "test"]);
-        assert_eq!(4, list.len());
-        assert_eq!("this", str::from_utf8(list.get(0)).unwrap());
-        assert_eq!("is", str::from_utf8(list.get(1)).unwrap());
-        assert_eq!("a", str::from_utf8(list.get(2)).unwrap());
-        assert_eq!("test", str::from_utf8(list.get(3)).unwrap());
-    }
-
-    #[test]
-    fn test_utf8_empty_strings() {
-        let list = List::from(vec!["", "", "", ""]);
-        assert_eq!(4, list.len());
-        assert_eq!("", str::from_utf8(list.get(0)).unwrap());
-        assert_eq!("", str::from_utf8(list.get(1)).unwrap());
-        assert_eq!("", str::from_utf8(list.get(2)).unwrap());
-        assert_eq!("", str::from_utf8(list.get(3)).unwrap());
-    }
-
-}
diff --git a/rust/src/list_builder.rs b/rust/src/list_builder.rs
deleted file mode 100644
index 970ff00..0000000
--- a/rust/src/list_builder.rs
+++ /dev/null
@@ -1,101 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::builder::*;
-use super::datatypes::*;
-use super::list::List;
-
-/// Builder for List<T>
-pub struct ListBuilder<T>
-where
-    T: ArrowPrimitiveType,
-{
-    data: Builder<T>,
-    offsets: Builder<i32>,
-}
-
-impl<T> ListBuilder<T>
-where
-    T: ArrowPrimitiveType,
-{
-    /// Create a ListBuilder with a default capacity
-    pub fn new() -> Self {
-        ListBuilder::with_capacity(64)
-    }
-
-    /// Create a ListBuilder with the specified capacity
-    pub fn with_capacity(n: usize) -> Self {
-        let data = Builder::with_capacity(n);
-        // take into account additional element (0), that we are pushing immediately
-        let mut offsets = Builder::with_capacity(n + 1);
-        offsets.push(0_i32);
-        ListBuilder { data, offsets }
-    }
-
-    /// Push one array slot to the builder
-    pub fn push(&mut self, slice: &[T]) {
-        self.data.push_slice(slice);
-        self.offsets.push(self.data.len() as i32);
-    }
-
-    /// Create an immutable List<T> from the builder
-    pub fn finish(&mut self) -> List<T> {
-        List::from_raw_parts(self.data.finish(), self.offsets.finish())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_list_u8_default_capacity() {
-        let mut b: ListBuilder<u8> = ListBuilder::new();
-        b.push("Hello, ".as_bytes());
-        b.push("World!".as_bytes());
-        let buffer = b.finish();
-
-        assert_eq!(2, buffer.len());
-        assert_eq!("Hello, ".as_bytes(), buffer.get(0));
-        assert_eq!("World!".as_bytes(), buffer.get(1));
-    }
-
-    #[test]
-    fn test_list_u8_zero_capacity() {
-        let mut b: ListBuilder<u8> = ListBuilder::with_capacity(0);
-        b.push("Hello, ".as_bytes());
-        b.push("World!".as_bytes());
-        let buffer = b.finish();
-        assert_eq!(2, buffer.len());
-        assert_eq!("Hello, ".as_bytes(), buffer.get(0));
-        assert_eq!("World!".as_bytes(), buffer.get(1));
-    }
-
-    #[test]
-    fn test_empty_lists() {
-        let mut b: ListBuilder<u8> = ListBuilder::new();
-        b.push("Hello, ".as_bytes());
-        b.push("".as_bytes());
-        b.push("World!".as_bytes());
-        let buffer = b.finish();
-
-        assert_eq!(3, buffer.len());
-        assert_eq!("Hello, ".as_bytes(), buffer.get(0));
-        assert_eq!("".as_bytes(), buffer.get(1));
-        assert_eq!("World!".as_bytes(), buffer.get(2));
-    }
-}
diff --git a/rust/src/memory.rs b/rust/src/memory.rs
index a62fcb2..adcfe2f 100644
--- a/rust/src/memory.rs
+++ b/rust/src/memory.rs
@@ -30,23 +30,23 @@ extern "C" {
 }
 
 #[cfg(windows)]
-pub fn allocate_aligned(size: i64) -> Result<*const u8, ArrowError> {
+pub fn allocate_aligned(size: i64) -> Result<*mut u8, ArrowError> {
     let page = unsafe { _aligned_malloc(size as libc::size_t, ALIGNMENT as libc::size_t) };
     match page {
         0 => Err(ArrowError::MemoryError(
             "Failed to allocate memory".to_string(),
         )),
-        _ => Ok(unsafe { mem::transmute::<libc::size_t, *const u8>(page) }),
+        _ => Ok(unsafe { mem::transmute::<libc::size_t, *mut u8>(page) }),
     }
 }
 
 #[cfg(not(windows))]
-pub fn allocate_aligned(size: i64) -> Result<*const u8, ArrowError> {
+pub fn allocate_aligned(size: i64) -> Result<*mut u8, ArrowError> {
     unsafe {
         let mut page: *mut libc::c_void = mem::uninitialized();
         let result = libc::posix_memalign(&mut page, ALIGNMENT, size as usize);
         match result {
-            0 => Ok(mem::transmute::<*mut libc::c_void, *const u8>(page)),
+            0 => Ok(mem::transmute::<*mut libc::c_void, *mut u8>(page)),
             _ => Err(ArrowError::MemoryError(
                 "Failed to allocate memory".to_string(),
             )),
@@ -68,6 +68,24 @@ pub fn free_aligned(p: *const u8) {
     }
 }
 
+pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) {
+    let src = mem::transmute::<*const u8, *const libc::c_void>(src);
+    let dst = mem::transmute::<*mut u8, *mut libc::c_void>(dst);
+    libc::memcpy(dst, src, len);
+}
+
+extern "C" {
+    #[inline]
+    pub fn memcmp(p1: *const u8, p2: *const u8, len: usize) -> i32;
+}
+
+/// Check if the pointer `p` is aligned to offset `a`.
+pub fn is_aligned<T>(p: *const T, a: usize) -> bool {
+    let a_minus_one = a.wrapping_sub(1);
+    let pmoda = p as usize & a_minus_one;
+    pmoda == 0
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -81,4 +99,18 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_is_aligned() {
+        // allocate memory aligned to 64-byte
+        let mut ptr = allocate_aligned(10).unwrap();
+        assert_eq!(true, is_aligned::<u8>(ptr, 1));
+        assert_eq!(true, is_aligned::<u8>(ptr, 2));
+        assert_eq!(true, is_aligned::<u8>(ptr, 4));
+
+        // now make the memory aligned to 63-byte
+        ptr = unsafe { ptr.offset(1) };
+        assert_eq!(true, is_aligned::<u8>(ptr, 1));
+        assert_eq!(false, is_aligned::<u8>(ptr, 2));
+        assert_eq!(false, is_aligned::<u8>(ptr, 4));
+    }
 }
diff --git a/rust/src/memory_pool.rs b/rust/src/memory_pool.rs
index c8deeac..207debc 100644
--- a/rust/src/memory_pool.rs
+++ b/rust/src/memory_pool.rs
@@ -26,7 +26,7 @@ use super::memory::{allocate_aligned, free_aligned};
 pub trait MemoryPool {
     /// Allocate memory.
     /// The implementation should ensures that allocated memory is aligned.
-    fn allocate(&self, size: usize) -> Result<*const u8, ArrowError>;
+    fn allocate(&self, size: usize) -> Result<*mut u8, ArrowError>;
 
     /// Reallocate memory.
     /// If the implementation doesn't support reallocating aligned memory, it allocates new memory
@@ -47,7 +47,7 @@ pub trait MemoryPool {
 struct LibcMemoryPool;
 
 impl MemoryPool for LibcMemoryPool {
-    fn allocate(&self, size: usize) -> Result<*const u8, ArrowError> {
+    fn allocate(&self, size: usize) -> Result<*mut u8, ArrowError> {
         allocate_aligned(size as i64)
     }
 
diff --git a/rust/src/record_batch.rs b/rust/src/record_batch.rs
index 4cb4573..4f8e82b 100644
--- a/rust/src/record_batch.rs
+++ b/rust/src/record_batch.rs
@@ -26,13 +26,13 @@ pub struct RecordBatch {
 }
 
 impl RecordBatch {
-    pub fn new(schema: Arc<Schema>, columns: Vec<Arc<Array>>) -> Self {
+    pub fn new(schema: Arc<Schema>, columns: Vec<ArrayRef>) -> Self {
         // assert that there are some columns
         assert!(columns.len() > 0);
         // assert that all columns have the same row count
-        let len = columns[0].len();
+        let len = columns[0].data().len();
         for i in 1..columns.len() {
-            assert_eq!(len, columns[i].len());
+            assert_eq!(len, columns[i].data().len());
         }
         RecordBatch { schema, columns }
     }
@@ -45,11 +45,11 @@ impl RecordBatch {
         self.columns.len()
     }
 
-    pub fn num_rows(&self) -> usize {
-        self.columns[0].len()
+    pub fn num_rows(&self) -> i64 {
+        self.columns[0].data().len()
     }
 
-    pub fn column(&self, i: usize) -> &Arc<Array> {
+    pub fn column(&self, i: usize) -> &ArrayRef {
         &self.columns[i]
     }
 }
@@ -60,6 +60,8 @@ unsafe impl Sync for RecordBatch {}
 #[cfg(test)]
 mod tests {
     use super::*;
+    use array_data::*;
+    use buffer::*;
 
     #[test]
     fn create_record_batch() {
@@ -68,8 +70,21 @@ mod tests {
             Field::new("b", DataType::Utf8, false),
         ]);
 
-        let a = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
-        let b = ListArray::from(vec!["a", "b", "c", "d", "e"]);
+        let v = vec![1, 2, 3, 4, 5];
+        let array_data = ArrayData::builder(DataType::Int32)
+            .len(5)
+            .add_buffer(Buffer::from(v.to_byte_slice()))
+            .build();
+        let a = PrimitiveArray::<i32>::from(array_data);
+
+        let v = vec![b'a', b'b', b'c', b'd', b'e'];
+        let offset_data = vec![0, 1, 2, 3, 4, 5, 6];
+        let array_data = ArrayData::builder(DataType::Utf8)
+            .len(5)
+            .add_buffer(Buffer::from(v.to_byte_slice()))
+            .add_buffer(Buffer::from(offset_data.to_byte_slice()))
+            .build();
+        let b = BinaryArray::from(array_data);
 
         let record_batch = RecordBatch::new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
 
@@ -80,7 +95,7 @@ mod tests {
             record_batch.schema().column(0).data_type()
         );
         assert_eq!(&DataType::Utf8, record_batch.schema().column(1).data_type());
-        assert_eq!(5, record_batch.column(0).len());
-        assert_eq!(5, record_batch.column(1).len());
+        assert_eq!(5, record_batch.column(0).data().len());
+        assert_eq!(5, record_batch.column(1).data().len());
     }
 }
diff --git a/rust/src/util/bit_util.rs b/rust/src/util/bit_util.rs
new file mode 100644
index 0000000..8d9a805
--- /dev/null
+++ b/rust/src/util/bit_util.rs
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+static BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128];
+
+static POPCOUNT_TABLE: [u8; 256] = [
+    0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
+    1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
+    1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
+    2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
+    1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
+    2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
+    2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
+    3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8,
+];
+
+/// Returns the nearest number that is `>=` than `num` and is a multiple of 64
+#[inline]
+pub fn round_upto_multiple_of_64(num: i64) -> i64 {
+    round_upto_power_of_2(num, 64)
+}
+
+/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must
+/// be a power of 2.
+fn round_upto_power_of_2(num: i64, factor: i64) -> i64 {
+    debug_assert!(factor > 0 && (factor & (factor - 1)) == 0);
+    (num + (factor - 1)) & !(factor - 1)
+}
+
+/// Returns whether bit at position `i` in `data` is set or not
+#[inline]
+pub fn get_bit(data: &[u8], i: i64) -> bool {
+    (data[(i / 8) as usize] & BIT_MASK[(i % 8) as usize]) != 0
+}
+
+/// Sets bit at position `i` for `data`
+#[inline]
+pub fn set_bit(data: &mut [u8], i: i64) {
+    data[(i / 8) as usize] |= BIT_MASK[(i % 8) as usize]
+}
+
+/// Returns the number of 1-bits in `data`
+#[inline]
+pub fn count_set_bits(data: &[u8]) -> i64 {
+    let mut count: i64 = 0;
+    for u in data {
+        count += POPCOUNT_TABLE[*u as usize] as i64;
+    }
+    count
+}
+
+/// Returns the number of 1-bits in `data`, starting from `offset`.
+#[inline]
+pub fn count_set_bits_offset(data: &[u8], offset: i64) -> i64 {
+    debug_assert!(offset <= (data.len() * 8) as i64);
+
+    let start_byte_pos = (offset / 8) as usize;
+    let start_bit_pos = offset % 8;
+
+    if start_bit_pos == 0 {
+        count_set_bits(&data[start_byte_pos..])
+    } else {
+        let mut result = 0;
+        result += count_set_bits(&data[start_byte_pos + 1..]);
+        for i in start_bit_pos..8 {
+            if get_bit(&data[start_byte_pos..start_byte_pos + 1], i as i64) {
+                result += 1;
+            }
+        }
+        result
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use rand::{thread_rng, Rng};
+    use std::collections::HashSet;
+
+    use super::*;
+
+    #[test]
+    fn test_round_upto_multiple_of_64() {
+        assert_eq!(0, round_upto_multiple_of_64(0));
+        assert_eq!(64, round_upto_multiple_of_64(1));
+        assert_eq!(64, round_upto_multiple_of_64(63));
+        assert_eq!(64, round_upto_multiple_of_64(64));
+        assert_eq!(128, round_upto_multiple_of_64(65));
+        assert_eq!(192, round_upto_multiple_of_64(129));
+    }
+
+    #[test]
+    fn test_get_bit() {
+        // 00001101
+        assert_eq!(true, get_bit(&[0b00001101], 0));
+        assert_eq!(false, get_bit(&[0b00001101], 1));
+        assert_eq!(true, get_bit(&[0b00001101], 2));
+        assert_eq!(true, get_bit(&[0b00001101], 3));
+
+        // 01001001 01010010
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 0));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 1));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 2));
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 3));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 4));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 5));
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 6));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 7));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 8));
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 9));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 10));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 11));
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 12));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 13));
+        assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 14));
+        assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 15));
+    }
+
+    #[test]
+    fn test_set_bit() {
+        let mut b = [0b00000000];
+        set_bit(&mut b, 0);
+        assert_eq!([0b00000001], b);
+        set_bit(&mut b, 2);
+        assert_eq!([0b00000101], b);
+        set_bit(&mut b, 5);
+        assert_eq!([0b00100101], b);
+    }
+
+    #[test]
+    fn test_get_set_bit_roundtrip() {
+        const NUM_BYTES: usize = 10;
+        const NUM_SETS: usize = 10;
+
+        let mut buffer: [u8; NUM_BYTES * 8] = [0; NUM_BYTES * 8];
+        let mut v = HashSet::new();
+        let mut rng = thread_rng();
+        for _ in 0..NUM_SETS {
+            let offset = rng.gen_range(0, 8 * NUM_BYTES);
+            v.insert(offset);
+            set_bit(&mut buffer[..], offset as i64);
+        }
+        for i in 0..NUM_BYTES * 8 {
+            assert_eq!(v.contains(&i), get_bit(&buffer[..], i as i64));
+        }
+    }
+
+    #[test]
+    fn test_count_bits_slice() {
+        assert_eq!(0, count_set_bits(&[0b00000000]));
+        assert_eq!(8, count_set_bits(&[0b11111111]));
+        assert_eq!(3, count_set_bits(&[0b00001101]));
+        assert_eq!(6, count_set_bits(&[0b01001001, 0b01010010]));
+    }
+
+    #[test]
+    fn test_count_bits_offset_slice() {
+        assert_eq!(8, count_set_bits_offset(&[0b11111111], 0));
+        assert_eq!(5, count_set_bits_offset(&[0b11111111], 3));
+        assert_eq!(0, count_set_bits_offset(&[0b11111111], 8));
+        assert_eq!(16, count_set_bits_offset(&[0b11111111, 0b11111111], 0));
+        assert_eq!(13, count_set_bits_offset(&[0b11111111, 0b11111111], 3));
+        assert_eq!(8, count_set_bits_offset(&[0b11111111, 0b11111111], 8));
+        assert_eq!(5, count_set_bits_offset(&[0b11111111, 0b11111111], 11));
+        assert_eq!(0, count_set_bits_offset(&[0b11111111, 0b11111111], 16));
+    }
+
+}
diff --git a/rust/examples/array_from_vec.rs b/rust/src/util/mod.rs
similarity index 75%
rename from rust/examples/array_from_vec.rs
rename to rust/src/util/mod.rs
index bf0f6c7..c20abba 100644
--- a/rust/examples/array_from_vec.rs
+++ b/rust/src/util/mod.rs
@@ -15,12 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-extern crate arrow;
-
-use arrow::array::*;
-
-fn main() {
-    // create a memory-aligned Arrow array from an existing Vec
-    let array = PrimitiveArray::from(vec![1, 2, 3, 4, 5]);
-    println!("array contents: {:?}", array.iter().collect::<Vec<i32>>());
-}
+pub mod bit_util;
+pub mod test_util;
diff --git a/rust/examples/array_from_builder.rs b/rust/src/util/test_util.rs
similarity index 53%
rename from rust/examples/array_from_builder.rs
rename to rust/src/util/test_util.rs
index d18953d..4ef48e6 100644
--- a/rust/examples/array_from_builder.rs
+++ b/rust/src/util/test_util.rs
@@ -15,28 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-extern crate arrow;
+use rand::{thread_rng, Rng};
 
-use arrow::array::*;
-use arrow::builder::*;
-
-fn main() {
-    let mut builder: Builder<i32> = Builder::new();
-    for i in 0..10 {
-        builder.push(i);
+/// Returns a vector of size `n`, filled with randomly generated bytes.
+pub fn random_bytes(n: usize) -> Vec<u8> {
+    let mut result = vec![];
+    let mut rng = thread_rng();
+    for _ in 0..n {
+        result.push(rng.gen_range(0, 255) & 0xFF);
     }
-    let buffer = builder.finish();
-
-    println!("buffer length: {}", buffer.len());
-    println!("buffer contents: {:?}", buffer.iter().collect::<Vec<i32>>());
-
-    // note that the builder can no longer be used once it has built a buffer, so either
-    // of the following calls will fail
-
-    //    builder.push(123);
-    //    builder.build();
-
-    // create a memory-aligned Arrow from the builder (zero-copy)
-    let array = PrimitiveArray::from(buffer);
-    println!("array contents: {:?}", array.iter().collect::<Vec<i32>>());
+    result
 }